Created
July 23, 2019 02:54
-
-
Save austospumanto/328d1d46d05318e0b3c43526df7f835a to your computer and use it in GitHub Desktop.
Some Python Pandas Utility Functions (Serialization, Data Type Casting)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
pip install \ | |
pandas \ | |
sqlalchemy \ | |
pyarrow | |
""" | |
import json | |
import os | |
import time | |
from functools import wraps | |
from pathlib import Path | |
from typing import Set | |
from typing import Type | |
from typing import Union | |
import pandas as pd | |
from sqlalchemy import Table | |
ENGINE = "pyarrow" | |
PD_NULLABLE_INT_TYPE = "Int64" | |
def timeit(): | |
def outer(func): | |
@wraps(func) | |
def inner(*args, **kwargs): | |
start_time = time.time() | |
res = func(*args, **kwargs) | |
interval = time.time() - start_time | |
print("Time for '%s': %0.3f seconds" % (func.__qualname__, interval)) | |
return res | |
return inner | |
return outer | |
def read_pd_dtypes(df_fp: Path) -> pd.Series: | |
assert isinstance(df_fp, Path) and "dtypes.pkl" not in str(df_fp), df_fp | |
return pd.read_pickle(str(df_fp) + ".dtypes.pkl") | |
def write_pd_dtypes(df_fp: Path, df: pd.DataFrame) -> None: | |
assert isinstance(df_fp, Path) and "dtypes.pkl" not in str(df_fp), df_fp | |
assert isinstance(df, pd.DataFrame), type(df) | |
df.dtypes.to_pickle(str(df_fp) + ".dtypes.pkl") | |
class Dtypes: | |
object = pd.np.dtype("O") | |
def read_pickle(filepath: Path) -> pd.DataFrame: | |
assert isinstance(filepath, Path), filepath | |
return pd.read_pickle(str(filepath)) | |
def to_pickle(df, filepath: Path): | |
assert isinstance(filepath, Path), filepath | |
if not filepath.parent.exists(): | |
os.makedirs(str(filepath.parent)) | |
df.to_pickle(filepath) | |
print(f"Wrote to {filepath}") | |
def to_feather(df_: pd.DataFrame, filepath: Path) -> None: | |
nullable_ints = get_nullable_int_cols(df_) | |
df_.astype(dtype={c: float for c in nullable_ints}).to_feather(filepath) | |
print(f"Wrote to {filepath}") | |
nullable_ints__fout = str(filepath) + ".nullints.json" | |
Path(nullable_ints__fout).write_text( | |
json.dumps(nullable_ints, indent=4, sort_keys=True) | |
) | |
print(f"Wrote to {nullable_ints__fout}") | |
def read_feather(filepath: Path) -> pd.DataFrame: | |
nullable_ints__fin = Path(str(filepath) + ".nullints.json") | |
nullable_ints = json.loads(Path(nullable_ints__fin).read_text()) | |
assert isinstance(nullable_ints, list) | |
return pd.read_feather(str(filepath)).astype( | |
dtype={c: PD_NULLABLE_INT_TYPE for c in nullable_ints} | |
) | |
def to_csv(df: pd.DataFrame, filepath: Path, opened_file=None) -> None: | |
assert isinstance(filepath, Path), filepath | |
write_pd_dtypes(df=df, df_fp=filepath) | |
df.reset_index().to_csv(opened_file or str(filepath), index=False) | |
def append_to_csv(df: pd.DataFrame, filepath: Path, opened_file=None) -> None: | |
assert isinstance(filepath, Path), filepath | |
assert filepath.exists(), filepath | |
df.reset_index().to_csv( | |
opened_file or str(filepath), index=False, mode="a", header=False | |
) | |
def read_csv(filepath: Path) -> pd.DataFrame: | |
assert isinstance(filepath, Path), filepath | |
dtype, parse_dates = split_out_parse_dates(read_pd_dtypes(filepath)) | |
return pd.read_csv( | |
filepath, dtype=dtype, keep_default_na=True, parse_dates=parse_dates | |
) | |
def get_nullable_int_cols(cls: Union[pd.DataFrame, Type, Table]): | |
# noinspection PyUnresolvedReferences | |
return [ | |
colname | |
for colname, coltype in cls.dtypes.iteritems() | |
if isinstance(coltype, pd.Int64Dtype) | |
] | |
def to_parquet(df_: pd.DataFrame, filepath: Path) -> None: | |
nullable_ints = get_nullable_int_cols(df_) | |
df_.astype(dtype={c: float for c in nullable_ints}).to_parquet( | |
filepath, index=False, engine=ENGINE | |
) | |
print(f"Wrote to {filepath}") | |
nullable_ints__fout = str(filepath) + ".nullints.json" | |
Path(nullable_ints__fout).write_text( | |
json.dumps(nullable_ints, indent=4, sort_keys=True) | |
) | |
print(f"Wrote to {nullable_ints__fout}") | |
def read_parquet(filepath: Path) -> pd.DataFrame: | |
nullable_ints__fin = Path(str(filepath) + ".nullints.json") | |
nullable_ints = json.loads(Path(nullable_ints__fin).read_text()) | |
assert isinstance(nullable_ints, list) | |
return pd.read_parquet(path=str(filepath), engine=ENGINE).astype( | |
dtype={c: PD_NULLABLE_INT_TYPE for c in nullable_ints} | |
) | |
def split_out_parse_dates(pd_dtypes: pd.Series): | |
parse_dates = [] | |
new_pd_dtypes = {} | |
for colname, coltype in pd_dtypes.items(): | |
try: | |
is_dtt = pd.np.issubdtype(coltype, pd.np.dtype("datetime64[ns]")) | |
except TypeError: | |
is_dtt = False | |
if is_dtt: | |
parse_dates.append(colname) | |
else: | |
new_pd_dtypes[colname] = coltype | |
return new_pd_dtypes, parse_dates | |
def get_tablename_from_filepath(fp: Path) -> str: | |
fp_name_parts = fp.name.split(".") | |
assert len(fp_name_parts) == 3, fp_name_parts | |
return fp_name_parts[1] | |
def configure_csv_field_size_limit(): | |
""" | |
To avoid this error: | |
-- | |
Traceback (most recent call last): | |
File "$PROJECT_ROOT_DIR/.venv/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code | |
exec(code_obj, self.user_global_ns, self.user_ns) | |
File "<ipython-input-5-263240bbee7e>", line 1, in <module> | |
main() | |
File "<ipython-input-4-1e6205fa58ad>", line 68, in main | |
load_into_bigquery(fp) | |
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/utils/timing.py", line 27, in inner | |
return func(*args, **kwargs) | |
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/bq_schema.py", line 321, in load_into_bigquery | |
theschema = genschema2(fp) | |
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/utils/timing.py", line 27, in inner | |
return func(*args, **kwargs) | |
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/bq_schema.py", line 106, in genschema2 | |
schema = run_schema_generator(sg, fp) | |
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/utils/timing.py", line 27, in inner | |
return func(*args, **kwargs) | |
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/bq_schema.py", line 156, in run_schema_generator | |
schema_map, error_logs = sg.deduce_schema(file=fin) | |
File "$PROJECT_ROOT_DIR/.venv/lib/python3.7/site-packages/bigquery_schema_generator/generate_schema.py", line 168, in deduce_schema | |
for json_object in reader: | |
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/csv.py", line 112, in __next__ | |
row = next(self.reader) | |
_csv.Error: field larger than field limit (131072) | |
""" | |
import sys | |
import csv | |
max_int = sys.maxsize | |
while True: | |
# decrease the max_int value by factor 10 | |
# as long as the OverflowError occurs. | |
try: | |
csv.field_size_limit(max_int) | |
break | |
except OverflowError: | |
max_int = int(max_int / 10) | |
def downcast_numeric_dtypes( | |
df: Union[pd.Series, pd.DataFrame] | |
) -> Union[pd.Series, pd.DataFrame]: | |
if isinstance(df, pd.Series): | |
if df.dtype == "int": | |
return pd.to_numeric(df, downcast="integer") | |
elif df.dtype == "float": | |
return pd.to_numeric(df, downcast="float") | |
else: | |
return df | |
else: | |
return pd.concat( | |
[ | |
df.select_dtypes(exclude=["int", "float"]), | |
df.select_dtypes(include="int").apply( | |
pd.to_numeric, downcast="integer" | |
), | |
df.select_dtypes(include="float").apply( | |
pd.to_numeric, downcast="float" | |
), | |
], | |
axis="columns", | |
) | |
@timeit() | |
def downcast_category_dtypes(df: pd.DataFrame) -> pd.DataFrame: | |
return pd.concat( | |
[ | |
df.select_dtypes(exclude=["object", "category"]), | |
df.select_dtypes(include="object").apply( | |
lambda s: s.astype("category") if s.nunique() / s.shape[0] < 0.5 else s | |
), | |
df.select_dtypes(include="category").apply( | |
lambda s: s.cat.remove_unused_categories() | |
), | |
], | |
axis="columns", | |
) | |
@timeit() | |
def downcast_dtypes(df: pd.DataFrame) -> pd.DataFrame: | |
return pd.concat( | |
[ | |
df.select_dtypes(exclude=["int", "float", "object", "category"]), | |
df.select_dtypes(include="int").apply(pd.to_numeric, downcast="integer"), | |
df.select_dtypes(include="float").apply(pd.to_numeric, downcast="float"), | |
df.select_dtypes(include="object").apply( | |
lambda s: s.astype("category") if s.nunique() / s.shape[0] < 0.5 else s | |
), | |
df.select_dtypes(include="category").apply( | |
lambda s: s.cat.remove_unused_categories() | |
), | |
], | |
axis="columns", | |
) | |
def get_dt_colnames(df: pd.DataFrame) -> Set[str]: | |
return set(df.dtypes[df.dtypes == pd.np.dtype("<M8[ns]")].index.values) | |
def assert_integer_dtype(dtype) -> None: | |
assert pd.np.issubdtype(dtype, pd.np.integer), dtype | |
def boolcols2dtype(df: pd.DataFrame, dtype) -> pd.DataFrame: | |
prefix = f"In {boolcols2dtype.__qualname__} --" | |
mask = df.select_dtypes(["category", object, bool, int]).apply( | |
lambda col: pd.Series(col.unique()) | |
.isin([pd.np.nan, True, False, 0, 1, 0.0, 1.0]) | |
.all() | |
) | |
print(f"{prefix} mask={mask}") | |
colnames = mask[mask].index | |
return df.assign(**{colname: df[colname].astype(dtype) for colname in colnames}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment