Skip to content

Instantly share code, notes, and snippets.

@raffaem
Created March 26, 2025 19:43
Show Gist options
  • Save raffaem/e7af56ffb425cceb4ccb3579806cecc0 to your computer and use it in GitHub Desktop.
Save raffaem/e7af56ffb425cceb4ccb3579806cecc0 to your computer and use it in GitHub Desktop.
pandas with logging
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2024-present Raffaele Mancuso <[email protected]>
# SPDX-License-Identifier: MIT
import pandas
import logging
class DataFrame(pandas.DataFrame):
def __init__(self, *args, **kwargs):
logging.info("Our init was called")
super().__init__(*args, **kwargs)
self.log = True
def copy_from(self, df):
self.log = df.log
def turn_off(self):
self.log_backup = self.log
self.log = False
def turn_on(self):
self.log = self.log_backup
def dfmvcols(self, start_cols=list(), end_cols=list()):
# Make sure start_cols and end_cols are lists
if isinstance(start_cols, str):
start_cols = [start_cols]
if isinstance(end_cols, str):
start_cols = [end_cols]
# Make sure columns exist in the dataframe
start_cols = [x for x in start_cols if x in self.columns]
end_cols = [x for x in end_cols if x in self.columns]
# Get columns for which we preserve the order
cols = [x for x in self.columns if ((x not in start_cols) and (x not in end_cols))]
# New column ordering
cols = start_cols + cols + end_cols
# Return
return self[cols]
def drop_duplicates(self, inplace=False, *args, **kwargs):
self.turn_off()
nrow0 = self.shape[0]
out_df = super().drop_duplicates(*args, **kwargs)
if not inplace:
nrow1 = out_df.shape[0]
else:
nrow1 = self.shape[0]
if(self.log_backup):
logging.getLogger("pandas-log").info(f"Dropped {nrow0-nrow1}/{nrow0} rows")
self.turn_on()
return out_df
def dropna(self, inplace=False, *args, **kwargs):
self.turn_off()
nrow0 = self.shape[0]
out_df = super().dropna(*args, **kwargs)
if not inplace:
nrow1 = out_df.shape[0]
else:
nrow1 = self.shape[0]
if(self.log_backup):
logging.getLogger("pandas-log").info(f"Dropped {nrow0-nrow1}/{nrow0} rows")
self.turn_on()
return out_df
def rename(self, *args, **kwargs):
def proc_pair(x,y):
col_pairs = list(zip(x, y))
col_pairs = list(filter(lambda x: x[0]!=x[1], col_pairs))
col_pairs = [str(x[0])+" -> "+str(x[1]) for x in col_pairs]
col_pairs = ", ".join(col_pairs)
return col_pairs
start_cols = self.columns
start_ixs = self.index
out_df = super().rename(*args, **kwargs)
fin_cols = out_df.columns
fin_ixs = out_df.index
col_pairs = proc_pair(start_cols, fin_cols)
ix_pairs = proc_pair(start_ixs, fin_ixs)
if self.log:
msg = ""
if col_pairs:
msg += f"Renamed columns: {col_pairs}; "
if ix_pairs:
msg += "Renamed indexes: {ix_pairs}"
logging.info(msg)
return out_df
def __getitem__(self, key):
# This returns a pandas DataFrame
out_df = super().__getitem__(key)
if isinstance(out_df, pandas.Series):
if self.log:
logging.getLogger("pandas-log").info(f"Selected a single column")
return out_df
out_df.__class__ = DataFrame
out_df.copy_from(self)
start_cols = self.columns
fin_cols = out_df.columns
start_ixs = self.index
fin_ixs = out_df.index
if self.log:
logging.getLogger("pandas-log").info(f"Selected {len(fin_cols):,d}/{len(start_cols):,d} columns and {len(fin_ixs):,d}/{len(start_ixs):,d} indexes")
return out_df
def concat(dfs, *args, **kwargs):
out_df = pandas.concat(dfs, *args, **kwargs)
out_df = DataFrame(out_df)
ls = [f"({df.shape[0]:,d} x {df.shape[1]:,d})" for df in dfs]
ls = " + ".join(ls)
logging.getLogger("pandas-log").info(f"Concatenating {ls} -> ({out_df.shape[0]:,d} x {out_df.shape[1]:,d})")
return out_df
def merge(left, right, on=None, left_on=None, right_on=None, *args, **kwargs):
left.turn_off()
right.turn_off()
outdf = pandas.merge(left, right, on=on, left_on=left_on, right_on=right_on, *args, **kwargs)
outdf.__class__ = DataFrame
outdf.copy_from(left)
if on is not None:
left_on = on
right_on = on
if isinstance(left_on, str):
left_on = [left_on]
if isinstance(right_on, str):
right_on = [right_on]
mask_left = left[left_on].set_index(left_on).index.isin(outdf[left_on].set_index(left_on).index)
mask_right = right[right_on].set_index(right_on).index.isin(outdf[right_on].set_index(right_on).index)
logging.getLogger("pandas-log").info("Merging:")
logging.getLogger("pandas-log").info(f"\t{len(mask_left)-sum(mask_left):,d}/{len(mask_left):,d} left rows unmatched")
logging.getLogger("pandas-log").info(f"\t{len(mask_right)-sum(mask_right):,d}/{len(mask_right):,d} right rows unmatched")
left.turn_on()
right.turn_on()
return outdf
def read_excel(file, *args, **kwargs):
out_df = pandas.read_excel(file, *args, **kwargs)
out_df = DataFrame(out_df)
logging.getLogger("pandas-log").info(f"Read Excel file '{file}' with shape ({out_df.shape[0]:,d} x {out_df.shape[1]:,d})")
return out_df
def read_csv(file, *args, **kwargs):
out_df = pandas.read_csv(file, *args, **kwargs)
out_df = DataFrame(out_df)
logging.getLogger("pandas-log").info(f"Read CSV file '{file}' with shape ({out_df.shape[0]:,d} x {out_df.shape[1]:,d})")
return out_df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment