|
import psycopg2 |
|
import pandas as pd |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
from datetime import datetime, timedelta |
|
import logging |
|
import os |
|
import time |
|
|
|
# --- Configuration --- |
|
HOST = 'your_host' |
|
PORT = 'your_port' |
|
USER = 'your_user' |
|
PASSWORD = 'your_password' |
|
DBNAME = 'your_dbname' |
|
SCHEMA = 'your_schema' |
|
BEGIN_DATE = '20240101' |
|
END_DATE = '20240131' |
|
OUTPUT_DIR = 'parquet_exports' |
|
LOG_DIR = 'logs' |
|
PARQUET_COMPRESSION = 'snappy' # Best balance of speed and compression |
|
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
os.makedirs(LOG_DIR, exist_ok=True) |
|
|
|
# --- Connect to PostgreSQL --- |
|
def connect_to_db(): |
|
try: |
|
return psycopg2.connect( |
|
host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME |
|
) |
|
except Exception as e: |
|
logging.error("❌ Failed to connect to DB: %s", e) |
|
raise |
|
|
|
# --- Get Tables from Schema --- |
|
def fetch_tables(conn): |
|
query = f""" |
|
SELECT table_name FROM information_schema.tables |
|
WHERE table_schema = %s AND table_type = 'BASE TABLE'; |
|
""" |
|
return pd.read_sql(query, conn, params=(SCHEMA,))['table_name'].tolist() |
|
|
|
# --- Check if Table Has calc_dt Column --- |
|
def has_calc_dt_column(conn, table): |
|
query = f""" |
|
SELECT 1 FROM information_schema.columns |
|
WHERE table_schema = %s AND table_name = %s AND column_name = 'calc_dt'; |
|
""" |
|
df = pd.read_sql(query, conn, params=(SCHEMA, table)) |
|
return not df.empty |
|
|
|
# --- Find Primary Key Column (bigint, auto-increment) --- |
|
def get_bigint_pk(conn, table): |
|
query = f""" |
|
SELECT a.attname |
|
FROM pg_index i |
|
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) |
|
JOIN pg_class c ON c.oid = i.indrelid |
|
JOIN pg_namespace n ON n.oid = c.relnamespace |
|
JOIN information_schema.columns col ON col.table_schema = n.nspname AND col.table_name = c.relname AND col.column_name = a.attname |
|
WHERE i.indisprimary |
|
AND n.nspname = %s |
|
AND c.relname = %s |
|
AND col.data_type = 'bigint' |
|
AND col.column_default LIKE 'nextval(%' |
|
LIMIT 1; |
|
""" |
|
df = pd.read_sql(query, conn, params=(SCHEMA, table)) |
|
return df['attname'][0] if not df.empty else None |
|
|
|
# --- Export Data for Specific Table and calc_dt --- |
|
def export_table_for_date(conn, table, calc_dt): |
|
start_time = time.time() |
|
pk_col = get_bigint_pk(conn, table) |
|
order_by = f"ORDER BY calc_dt, {pk_col}" if pk_col else "ORDER BY calc_dt" |
|
query = f""" |
|
SELECT * FROM {SCHEMA}.{table} |
|
WHERE calc_dt = %s |
|
{order_by}; |
|
""" |
|
df = pd.read_sql(query, conn, params=(calc_dt,)) |
|
if df.empty: |
|
logging.info(f"⏭️ [{table}] Skipped: No records for {calc_dt}") |
|
return None |
|
|
|
filename = os.path.join(OUTPUT_DIR, f"{table}_{calc_dt}.parquet") |
|
df.to_parquet(filename, engine="pyarrow", compression=PARQUET_COMPRESSION) |
|
|
|
row_count = len(df) |
|
col_count = len(df.columns) |
|
size_mb = round(df.memory_usage(deep=True).sum() / 1024**2, 2) |
|
elapsed = round(time.time() - start_time, 2) |
|
|
|
logging.info( |
|
f"✅ [{table}] {row_count} rows, {col_count} cols, {size_mb} MB written in {elapsed}s → {filename}" |
|
) |
|
return { |
|
"table": table, |
|
"rows": row_count, |
|
"columns": col_count, |
|
"size_mb": size_mb, |
|
"filename": filename, |
|
"duration": elapsed |
|
} |
|
|
|
# --- Set up logging to file and console for each calc_dt --- |
|
def setup_logger(calc_dt): |
|
logging.shutdown() |
|
for handler in logging.root.handlers[:]: |
|
logging.root.removeHandler(handler) |
|
|
|
log_file = os.path.join(LOG_DIR, f"log_{calc_dt}.txt") |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
handlers=[ |
|
logging.StreamHandler(), |
|
logging.FileHandler(log_file, mode='w', encoding='utf-8') |
|
] |
|
) |
|
|
|
# --- Process One calc_dt Across All Tables --- |
|
async def process_calc_date(calc_dt, tables, executor): |
|
setup_logger(calc_dt) |
|
logging.info(f"🚀 Starting export for calc_dt = {calc_dt}") |
|
conn = connect_to_db() |
|
stats = [] |
|
start_time = time.time() |
|
try: |
|
loop = asyncio.get_running_loop() |
|
tasks = [] |
|
for table in tables: |
|
if has_calc_dt_column(conn, table): |
|
task = loop.run_in_executor(executor, export_table_for_date, conn, table, calc_dt) |
|
tasks.append(task) |
|
results = await asyncio.gather(*tasks) |
|
for result in results: |
|
if result: |
|
stats.append(result) |
|
finally: |
|
conn.close() |
|
|
|
# Daily summary |
|
total_rows = sum(r["rows"] for r in stats) |
|
total_tables = len(stats) |
|
total_size = round(sum(r["size_mb"] for r in stats), 2) |
|
duration = round(time.time() - start_time, 2) |
|
|
|
logging.info( |
|
f"📊 Summary for {calc_dt}: {total_tables} tables, {total_rows} rows, {total_size} MB exported in {duration}s" |
|
) |
|
|
|
# --- Main Async Runner --- |
|
async def main(): |
|
conn = connect_to_db() |
|
try: |
|
tables = fetch_tables(conn) |
|
finally: |
|
conn.close() |
|
|
|
start = datetime.strptime(BEGIN_DATE, "%Y%m%d") |
|
end = datetime.strptime(END_DATE, "%Y%m%d") |
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
current = start |
|
while current <= end: |
|
calc_dt = current.strftime("%Y%m%d") |
|
await process_calc_date(calc_dt, tables, executor) |
|
current += timedelta(days=1) |
|
|
|
# --- Run Script --- |
|
if __name__ == "__main__": |
|
asyncio.run(main()) |