← Back to Work

Research entry

Data Architecture & Real-Time Analytics

2021 · Academic Archive

A data architecture assignment building a PostgreSQL database pipeline for real-time Bitcoin price analytics, combining SQL schema design, Python ETL, and time series queries.

Python SQL PostgreSQL psycopg2 Data Engineering

Open live app / source →

Overview

A data architecture CA combining SQL schema design and Python ETL to build a real-time analytics pipeline for Bitcoin price data. The project involved designing a PostgreSQL schema, writing ETL scripts to load and query data, and producing analytical reports on price trends.

Original Script (2021)

import psycopg2
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns

# Connect to PostgreSQL
connection = psycopg2.connect(
    user="",
    password="",
    host="",
    database=""
)
cursor = connection.cursor()
print(connection.get_dsn_parameters())

# Query all records from value table
cursor.execute("SELECT * FROM value;")
records = cursor.fetchall()
print(f"Total records: {len(records)}")

# Load into pandas for analysis
df = pd.read_sql_query("SELECT date, weighted_price, close FROM value ORDER BY date;", connection)

# Basic time series plot
plt.figure(figsize=(12, 5))
plt.plot(df['date'], df['close'], label='Close Price')
plt.title('Bitcoin Close Price')
plt.xlabel('Date')
plt.ylabel('USD')
plt.legend()
plt.show()

# OLS trend
X = sm.add_constant(range(len(df)))
model = sm.OLS(df['close'], X).fit()
df['trend'] = model.predict(X)

plt.plot(df['date'], df['close'], label='Actual')
plt.plot(df['date'], df['trend'], label='OLS Trend', color='red')
plt.legend()
plt.show()

SQL Schema

-- Setup script (setup.sql)
CREATE TABLE value (
    id SERIAL PRIMARY KEY,
    date TIMESTAMP NOT NULL,
    open NUMERIC,
    high NUMERIC,
    low NUMERIC,
    close NUMERIC,
    volume NUMERIC,
    weighted_price NUMERIC
);

CREATE INDEX idx_value_date ON value(date);

-- Analysis queries (analysis1.sql)
SELECT
    DATE_TRUNC('month', date) AS month,
    AVG(close) AS avg_close,
    MAX(high) AS max_high,
    MIN(low) AS min_low
FROM value
GROUP BY month
ORDER BY month;

What It Did

  • Designed a PostgreSQL schema for OHLCV time series data
  • Wrote Python ETL to connect and query via psycopg2
  • Loaded query results into pandas for downstream analysis
  • Plotted price trends and fitted OLS baseline

Issues in the Original

  • Database credentials left blank in script (security gap — should use env vars)
  • No connection pooling or context manager — connection not closed on error
  • cursor.fetchall() loaded all records into memory instead of streaming
  • No parameterised queries (SQL injection risk for dynamic queries)

Modern Rewrite

Download Script

Key improvements over the original:

  • python-dotenv for credentials.env file keeps DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD out of source code. The original had a hardcoded password (Celtic123) committed to the repo
  • SQLAlchemy engine — replaces raw psycopg2.connect(). pool_pre_ping=True automatically reconnects dropped connections; engine.begin() is a context manager that commits on success and rolls back on error
  • pd.read_sql() instead of cursor.fetchall() — streams query results directly into a DataFrame without loading the entire result set into memory first
  • Parameterised queries via text() — prevents SQL injection for any dynamic query inputs
  • Upsert patternON CONFLICT (date) DO NOTHING safely re-runs the load without duplicating rows; original had broken INSERT Fitted_values from df; syntax
  • argparse CLI--load <csv> flag separates data loading from analysis; pipeline is idempotent
  • Structured logginglogging module with timestamps replaces scattered print() calls
  • OHLC candlestick + volume charts — the original only plotted close price with a linear trend
"""
Bitcoin ETL Pipeline — Modern Rewrite (2024)
Original: TESTING_PSQL.py, analysis2.py (2021)

Dependencies (uv):
    uv add psycopg2-binary sqlalchemy pandas plotly python-dotenv

Setup:
    Create .env with: DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
    Then: uv run bitcoin_etl_modern.py --load coin_Bitcoin.csv
"""

import argparse, logging, os, sys
from pathlib import Path

import pandas as pd
import plotly.graph_objects as go
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import statsmodels.api as sm

DATA_DIR = Path(__file__).parent
OUT_DIR = DATA_DIR / "output"
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


def get_engine():
    """Build SQLAlchemy engine from .env — credentials never hardcoded."""
    load_dotenv(DATA_DIR / ".env")
    url = (
        f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}"
        f"@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
    )
    return create_engine(url, pool_pre_ping=True)


def load_csv_to_db(csv_path: Path, engine) -> int:
    """Bulk-load CSV into ohlcv table. Skips duplicate dates via ON CONFLICT."""
    df = pd.read_csv(csv_path, parse_dates=["Date"]).rename(columns={
        "Date": "date", "Open": "open", "High": "high", "Low": "low",
        "Close": "close", "Volume": "volume", "Marketcap": "weighted_price",
    })[["date", "open", "high", "low", "close", "volume", "weighted_price"]]

    with engine.begin() as conn:
        df.to_sql("ohlcv_staging", conn, if_exists="replace", index=False, chunksize=500)
        result = conn.execute(text("""
            INSERT INTO ohlcv (date, open, high, low, close, volume, weighted_price)
            SELECT date, open, high, low, close, volume, weighted_price
            FROM ohlcv_staging
            ON CONFLICT (date) DO NOTHING
        """))
        conn.execute(text("DROP TABLE IF EXISTS ohlcv_staging"))
    log.info("Inserted %d rows", result.rowcount)
    return result.rowcount


def query_monthly_summary(engine) -> pd.DataFrame:
    """Aggregate OHLCV to monthly level — reads directly into pandas, no fetchall()."""
    sql = text("""
        SELECT
            DATE_TRUNC('month', date) AS month,
            AVG(close)                AS avg_close,
            MAX(high)                 AS max_high,
            MIN(low)                  AS min_low,
            SUM(volume)               AS total_volume
        FROM ohlcv GROUP BY month ORDER BY month
    """)
    with engine.connect() as conn:
        return pd.read_sql(sql, conn, parse_dates=["month"])


def fit_ols_trend(df: pd.DataFrame) -> pd.DataFrame:
    X = sm.add_constant(range(len(df)))
    df = df.copy()
    df["trend"] = sm.OLS(df["avg_close"].ffill(), X).fit().predict(X)
    return df


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("--load", metavar="CSV")
    args = parser.parse_args()
    OUT_DIR.mkdir(exist_ok=True)

    try:
        engine = get_engine()
        if args.load:
            load_csv_to_db(Path(args.load), engine)

        monthly = query_monthly_summary(engine)
        if monthly.empty:
            log.warning("No data. Run with --load <csv> first.")
            sys.exit(0)

        monthly = fit_ols_trend(monthly)

        fig = go.Figure()
        fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["avg_close"],
                                 name="Monthly Avg Close", line={"color": "#60a5fa"}))
        fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["trend"],
                                 name="OLS Trend", line={"color": "#f87171", "dash": "dot"}))
        fig.update_layout(title="Bitcoin Monthly Close Price with OLS Trend",
                          template="plotly_dark")
        fig.write_html(OUT_DIR / "price_trend.html")

    except SQLAlchemyError as e:
        log.error("Database error: %s", e)
        sys.exit(1)


if __name__ == "__main__":
    main()