Skip to content

Instantly share code, notes, and snippets.

@recalde
Last active May 13, 2025 22:17
Show Gist options
  • Save recalde/9daa1479a9ebb971c9c53a9d4f5251a3 to your computer and use it in GitHub Desktop.
Save recalde/9daa1479a9ebb971c9c53a9d4f5251a3 to your computer and use it in GitHub Desktop.
PostParq

Write a Python program that connects to a PostgreSQL database using host, port, user, password, and schema name. The program should:

  • Loop over each day between a given BEGIN_DATE and END_DATE (formatted as YYYYMMDD). Each date represents a value for the calc_dt column.

  • For each day:

    • Identify all tables in the given schema that contain a column named calc_dt.
    • For each matching table:
      • Check if it has a primary key column that is a single bigint with an auto-increment default (PostgreSQL sequence). If so, sort exports by calc_dt and this primary key. If not, sort by calc_dt only.
      • Select all rows from the table where calc_dt = current_date, and export to a Parquet file named {table}_{calc_dt}.parquet.
      • Use snappy compression for a good balance of speed and file size.
      • Log for each table:
        • Table name
        • Number of rows exported
        • Number of columns
        • Estimated in-memory size in MB
        • Time taken to export
  • For each date processed, log a summary:

    • Number of tables exported
    • Total rows across all tables
    • Total in-memory data size
    • Total export duration for the day
  • Run table exports concurrently using asyncio and ThreadPoolExecutor.

  • Log all messages to both:

    • The console
    • A file named logs/log_{calc_dt}.txt

The program should create the folders parquet_exports/ and logs/ if they don't already exist.

import psycopg2
import pandas as pd
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
import logging
import os
import time
# --- Configuration ---
HOST = 'your_host'
PORT = 'your_port'
USER = 'your_user'
PASSWORD = 'your_password'
DBNAME = 'your_dbname'
SCHEMA = 'your_schema'
BEGIN_DATE = '20240101'
END_DATE = '20240131'
OUTPUT_DIR = 'parquet_exports'
LOG_DIR = 'logs'
PARQUET_COMPRESSION = 'snappy' # Best balance of speed and compression
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
# --- Connect to PostgreSQL ---
def connect_to_db():
try:
return psycopg2.connect(
host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME
)
except Exception as e:
logging.error("❌ Failed to connect to DB: %s", e)
raise
# --- Get Tables from Schema ---
def fetch_tables(conn):
query = f"""
SELECT table_name FROM information_schema.tables
WHERE table_schema = %s AND table_type = 'BASE TABLE';
"""
return pd.read_sql(query, conn, params=(SCHEMA,))['table_name'].tolist()
# --- Check if Table Has calc_dt Column ---
def has_calc_dt_column(conn, table):
query = f"""
SELECT 1 FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s AND column_name = 'calc_dt';
"""
df = pd.read_sql(query, conn, params=(SCHEMA, table))
return not df.empty
# --- Find Primary Key Column (bigint, auto-increment) ---
def get_bigint_pk(conn, table):
query = f"""
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
JOIN pg_class c ON c.oid = i.indrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN information_schema.columns col ON col.table_schema = n.nspname AND col.table_name = c.relname AND col.column_name = a.attname
WHERE i.indisprimary
AND n.nspname = %s
AND c.relname = %s
AND col.data_type = 'bigint'
AND col.column_default LIKE 'nextval(%'
LIMIT 1;
"""
df = pd.read_sql(query, conn, params=(SCHEMA, table))
return df['attname'][0] if not df.empty else None
# --- Export Data for Specific Table and calc_dt ---
def export_table_for_date(conn, table, calc_dt):
start_time = time.time()
pk_col = get_bigint_pk(conn, table)
order_by = f"ORDER BY calc_dt, {pk_col}" if pk_col else "ORDER BY calc_dt"
query = f"""
SELECT * FROM {SCHEMA}.{table}
WHERE calc_dt = %s
{order_by};
"""
df = pd.read_sql(query, conn, params=(calc_dt,))
if df.empty:
logging.info(f"⏭️ [{table}] Skipped: No records for {calc_dt}")
return None
filename = os.path.join(OUTPUT_DIR, f"{table}_{calc_dt}.parquet")
df.to_parquet(filename, engine="pyarrow", compression=PARQUET_COMPRESSION)
row_count = len(df)
col_count = len(df.columns)
size_mb = round(df.memory_usage(deep=True).sum() / 1024**2, 2)
elapsed = round(time.time() - start_time, 2)
logging.info(
f"✅ [{table}] {row_count} rows, {col_count} cols, {size_mb} MB written in {elapsed}s → {filename}"
)
return {
"table": table,
"rows": row_count,
"columns": col_count,
"size_mb": size_mb,
"filename": filename,
"duration": elapsed
}
# --- Set up logging to file and console for each calc_dt ---
def setup_logger(calc_dt):
logging.shutdown()
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
log_file = os.path.join(LOG_DIR, f"log_{calc_dt}.txt")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file, mode='w', encoding='utf-8')
]
)
# --- Process One calc_dt Across All Tables ---
async def process_calc_date(calc_dt, tables, executor):
setup_logger(calc_dt)
logging.info(f"🚀 Starting export for calc_dt = {calc_dt}")
conn = connect_to_db()
stats = []
start_time = time.time()
try:
loop = asyncio.get_running_loop()
tasks = []
for table in tables:
if has_calc_dt_column(conn, table):
task = loop.run_in_executor(executor, export_table_for_date, conn, table, calc_dt)
tasks.append(task)
results = await asyncio.gather(*tasks)
for result in results:
if result:
stats.append(result)
finally:
conn.close()
# Daily summary
total_rows = sum(r["rows"] for r in stats)
total_tables = len(stats)
total_size = round(sum(r["size_mb"] for r in stats), 2)
duration = round(time.time() - start_time, 2)
logging.info(
f"📊 Summary for {calc_dt}: {total_tables} tables, {total_rows} rows, {total_size} MB exported in {duration}s"
)
# --- Main Async Runner ---
async def main():
conn = connect_to_db()
try:
tables = fetch_tables(conn)
finally:
conn.close()
start = datetime.strptime(BEGIN_DATE, "%Y%m%d")
end = datetime.strptime(END_DATE, "%Y%m%d")
with ThreadPoolExecutor(max_workers=5) as executor:
current = start
while current <= end:
calc_dt = current.strftime("%Y%m%d")
await process_calc_date(calc_dt, tables, executor)
current += timedelta(days=1)
# --- Run Script ---
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment