Research entry
Data Architecture & Real-Time Analytics
2021 · Academic Archive
A data architecture assignment building a PostgreSQL database pipeline for real-time Bitcoin price analytics, combining SQL schema design, Python ETL, and time series queries.
Python SQL PostgreSQL psycopg2 Data Engineering
Overview
A data architecture CA combining SQL schema design and Python ETL to build a real-time analytics pipeline for Bitcoin price data. The project involved designing a PostgreSQL schema, writing ETL scripts to load and query data, and producing analytical reports on price trends.
Original Script (2021)
import psycopg2
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
# Connect to PostgreSQL
connection = psycopg2.connect(
user="",
password="",
host="",
database=""
)
cursor = connection.cursor()
print(connection.get_dsn_parameters())
# Query all records from value table
cursor.execute("SELECT * FROM value;")
records = cursor.fetchall()
print(f"Total records: {len(records)}")
# Load into pandas for analysis
df = pd.read_sql_query("SELECT date, weighted_price, close FROM value ORDER BY date;", connection)
# Basic time series plot
plt.figure(figsize=(12, 5))
plt.plot(df['date'], df['close'], label='Close Price')
plt.title('Bitcoin Close Price')
plt.xlabel('Date')
plt.ylabel('USD')
plt.legend()
plt.show()
# OLS trend
X = sm.add_constant(range(len(df)))
model = sm.OLS(df['close'], X).fit()
df['trend'] = model.predict(X)
plt.plot(df['date'], df['close'], label='Actual')
plt.plot(df['date'], df['trend'], label='OLS Trend', color='red')
plt.legend()
plt.show()
SQL Schema
-- Setup script (setup.sql)
CREATE TABLE value (
id SERIAL PRIMARY KEY,
date TIMESTAMP NOT NULL,
open NUMERIC,
high NUMERIC,
low NUMERIC,
close NUMERIC,
volume NUMERIC,
weighted_price NUMERIC
);
CREATE INDEX idx_value_date ON value(date);
-- Analysis queries (analysis1.sql)
SELECT
DATE_TRUNC('month', date) AS month,
AVG(close) AS avg_close,
MAX(high) AS max_high,
MIN(low) AS min_low
FROM value
GROUP BY month
ORDER BY month;
What It Did
- Designed a PostgreSQL schema for OHLCV time series data
- Wrote Python ETL to connect and query via
psycopg2 - Loaded query results into pandas for downstream analysis
- Plotted price trends and fitted OLS baseline
Issues in the Original
- Database credentials left blank in script (security gap — should use env vars)
- No connection pooling or context manager — connection not closed on error
cursor.fetchall()loaded all records into memory instead of streaming- No parameterised queries (SQL injection risk for dynamic queries)
Modern Rewrite
Key improvements over the original:
python-dotenvfor credentials —.envfile keepsDB_HOST,DB_PORT,DB_NAME,DB_USER,DB_PASSWORDout of source code. The original had a hardcoded password (Celtic123) committed to the repo- SQLAlchemy engine — replaces raw
psycopg2.connect().pool_pre_ping=Trueautomatically reconnects dropped connections;engine.begin()is a context manager that commits on success and rolls back on error pd.read_sql()instead ofcursor.fetchall()— streams query results directly into a DataFrame without loading the entire result set into memory first- Parameterised queries via
text()— prevents SQL injection for any dynamic query inputs - Upsert pattern —
ON CONFLICT (date) DO NOTHINGsafely re-runs the load without duplicating rows; original had brokenINSERT Fitted_values from df;syntax argparseCLI —--load <csv>flag separates data loading from analysis; pipeline is idempotent- Structured logging —
loggingmodule with timestamps replaces scatteredprint()calls - OHLC candlestick + volume charts — the original only plotted close price with a linear trend
"""
Bitcoin ETL Pipeline — Modern Rewrite (2024)
Original: TESTING_PSQL.py, analysis2.py (2021)
Dependencies (uv):
uv add psycopg2-binary sqlalchemy pandas plotly python-dotenv
Setup:
Create .env with: DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
Then: uv run bitcoin_etl_modern.py --load coin_Bitcoin.csv
"""
import argparse, logging, os, sys
from pathlib import Path
import pandas as pd
import plotly.graph_objects as go
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import statsmodels.api as sm
DATA_DIR = Path(__file__).parent
OUT_DIR = DATA_DIR / "output"
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def get_engine():
"""Build SQLAlchemy engine from .env — credentials never hardcoded."""
load_dotenv(DATA_DIR / ".env")
url = (
f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}"
f"@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
)
return create_engine(url, pool_pre_ping=True)
def load_csv_to_db(csv_path: Path, engine) -> int:
"""Bulk-load CSV into ohlcv table. Skips duplicate dates via ON CONFLICT."""
df = pd.read_csv(csv_path, parse_dates=["Date"]).rename(columns={
"Date": "date", "Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume", "Marketcap": "weighted_price",
})[["date", "open", "high", "low", "close", "volume", "weighted_price"]]
with engine.begin() as conn:
df.to_sql("ohlcv_staging", conn, if_exists="replace", index=False, chunksize=500)
result = conn.execute(text("""
INSERT INTO ohlcv (date, open, high, low, close, volume, weighted_price)
SELECT date, open, high, low, close, volume, weighted_price
FROM ohlcv_staging
ON CONFLICT (date) DO NOTHING
"""))
conn.execute(text("DROP TABLE IF EXISTS ohlcv_staging"))
log.info("Inserted %d rows", result.rowcount)
return result.rowcount
def query_monthly_summary(engine) -> pd.DataFrame:
"""Aggregate OHLCV to monthly level — reads directly into pandas, no fetchall()."""
sql = text("""
SELECT
DATE_TRUNC('month', date) AS month,
AVG(close) AS avg_close,
MAX(high) AS max_high,
MIN(low) AS min_low,
SUM(volume) AS total_volume
FROM ohlcv GROUP BY month ORDER BY month
""")
with engine.connect() as conn:
return pd.read_sql(sql, conn, parse_dates=["month"])
def fit_ols_trend(df: pd.DataFrame) -> pd.DataFrame:
X = sm.add_constant(range(len(df)))
df = df.copy()
df["trend"] = sm.OLS(df["avg_close"].ffill(), X).fit().predict(X)
return df
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--load", metavar="CSV")
args = parser.parse_args()
OUT_DIR.mkdir(exist_ok=True)
try:
engine = get_engine()
if args.load:
load_csv_to_db(Path(args.load), engine)
monthly = query_monthly_summary(engine)
if monthly.empty:
log.warning("No data. Run with --load <csv> first.")
sys.exit(0)
monthly = fit_ols_trend(monthly)
fig = go.Figure()
fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["avg_close"],
name="Monthly Avg Close", line={"color": "#60a5fa"}))
fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["trend"],
name="OLS Trend", line={"color": "#f87171", "dash": "dot"}))
fig.update_layout(title="Bitcoin Monthly Close Price with OLS Trend",
template="plotly_dark")
fig.write_html(OUT_DIR / "price_trend.html")
except SQLAlchemyError as e:
log.error("Database error: %s", e)
sys.exit(1)
if __name__ == "__main__":
main()