Created
July 23, 2019 01:57
-
-
Save austospumanto/5f83321ca0bcf38025e38b0c57167603 to your computer and use it in GitHub Desktop.
Python Google BigQuery Utility Functions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
pip install \ | |
bigquery-schema-generator \ | |
google-api-python-client \ | |
pandas-gbq \ | |
pandas | |
""" | |
import logging | |
import os | |
import re | |
import time | |
from collections import OrderedDict | |
from contextlib import closing | |
from copy import deepcopy | |
from dataclasses import dataclass, astuple, asdict | |
from functools import lru_cache, wraps | |
from io import StringIO | |
from itertools import chain | |
from pathlib import Path | |
from typing import List, Dict, Union, Tuple | |
import pandas as pd | |
from bigquery_schema_generator.generate_schema import SchemaGenerator | |
from google.api_core.exceptions import NotFound as TableNotFound | |
from google.cloud import bigquery | |
from google.oauth2.service_account import Credentials | |
from pandas_gbq import to_gbq | |
MAX_COLUMN_NAME_LENGTH = 100 | |
BAD_LETTER_PATT = re.compile(r"[^_\w]") | |
EXPECTED_EXC = { | |
"FLOAT": "cannot safely cast non-equivalent float64 to int64", | |
"STRING": "could not convert string to float", | |
} | |
PROJECT_ID = os.environ["GCP_PROJECT_ID"] | |
DATASET_ID = os.environ["GCP_DATASET_ID"] | |
CREDENTIALS_FILEPATH = os.environ["GCP_CREDENTIALS_FILEPATH"] | |
def creds(): | |
return Credentials.from_service_account_file( | |
str(relative_to_project_root(CREDENTIALS_FILEPATH)) | |
) | |
def timeit(): | |
def outer(func): | |
@wraps(func) | |
def inner(*args, **kwargs): | |
start_time = time.time() | |
res = func(*args, **kwargs) | |
interval = time.time() - start_time | |
print("Time for '%s': %0.3f seconds" % (func.__qualname__, interval)) | |
return res | |
return inner | |
return outer | |
class ProjectRootNotFoundError(Exception): | |
pass | |
def find_project_root() -> Path: | |
try: | |
parent = Path(__file__).parent | |
except NameError: | |
parent = Path(os.getcwd()) | |
while True: | |
if (parent / ".projroot").exists(): | |
break | |
child, parent = parent, parent.parent | |
if "-" in child.name: | |
continue | |
if child is parent: | |
raise ProjectRootNotFoundError() | |
if parent.name.replace("-", "_") == child.name: | |
break | |
if not (parent / ".projroot").exists(): | |
(parent / ".projroot").touch() | |
return parent | |
def relative_to_project_root(path: str) -> Path: | |
if os.environ.get("PROJECT_ROOT") is not None: | |
projroot = Path(os.environ["PROJECT_ROOT"]).absolute() | |
else: | |
projroot = find_project_root() | |
return projroot / path | |
def get_file_size_in_bytes(fp) -> int: | |
fp = relative_to_project_root(str(fp)) | |
return fp.stat().st_size | |
@dataclass | |
class GbqField: | |
name: str | |
type: str | |
mode: str | |
def __post_init__(self): | |
self.name = fix_column_name_for_gbq(self.name) | |
@dataclass | |
class GbqSchema: | |
fields: List[GbqField] | |
def __post_init__(self): | |
self.fields = sorted(self.fields, key=lambda f: (f.name, f.type, f.mode)) | |
def __getitem__(self, item): | |
return next(f for f in self.fields if f.name == item) | |
def as_series(self) -> pd.Series: | |
index, values = zip(*map(astuple, self.fields)) | |
return pd.Series(values, index=index) | |
def to_canonical(self) -> List[Dict[str, str]]: | |
return [asdict(f) for f in self.fields] | |
@timeit() | |
def load_df(fp, allstrings, usecols=None): | |
if isinstance(fp, (str, Path)): | |
fp = relative_to_project_root(str(fp)) | |
assert get_file_size_in_bytes(fp) > 0, get_file_size_in_bytes(fp) | |
fp = str(fp) | |
df_: pd.DataFrame = pd.read_csv( | |
fp, | |
dtype=str if allstrings else None, | |
keep_default_na=not allstrings, | |
usecols=usecols, | |
) | |
print(f"Loaded dataframe with df_.shape={df_.shape}") | |
return df_ | |
def massage_filepath(fp: Union[str, Path]) -> Path: | |
fp = str(fp) | |
if ".csv" not in fp and "dbo." not in fp: | |
fp = f"dbo.{fp}.csv" | |
if "pdw/" not in fp: | |
# noinspection PyUnresolvedReferences | |
fp = f'pdw/{fp.lstrip("/")}' | |
ret: Path = relative_to_project_root(str(fp)) | |
assert ret.exists(), ret | |
return ret | |
@timeit() | |
@lru_cache(1000) | |
def genschema2(fp) -> GbqSchema: | |
logging.basicConfig(level=logging.INFO) | |
sg = SchemaGenerator( | |
input_format="csv", | |
infer_mode=True, | |
keep_nulls=True, | |
quoted_values_are_strings=False, # TODO: ? | |
debugging_interval=100_000, | |
) | |
schema = run_schema_generator(sg, fp) | |
return GbqSchema(fields=[GbqField(**f) for f in schema]) | |
@timeit() | |
@lru_cache(1) | |
def get_text(fp): | |
return fp.read_text() | |
@timeit() | |
def deduce_schema(sg: SchemaGenerator, fin) -> tuple: | |
return sg.deduce_schema(file=fin) | |
@timeit() | |
def run_schema_generator(sg: SchemaGenerator, fp: Path) -> dict: | |
# with closing(sample_lines_from(fp)) as fin: | |
with closing(StringIO(get_text(fp))) as fin: | |
schema_map, error_logs = deduce_schema(sg, fin) | |
for error in error_logs: | |
print("Problem on line %s: %s", error["line"], error["msg"]) | |
starting_schema: OrderedDict = sg.flatten_schema(schema_map) | |
schema = deepcopy(starting_schema) | |
for idx, schema_item in enumerate(starting_schema): | |
mode = schema_item["mode"] | |
name = schema_item["name"] | |
type_ = schema_item["type"] | |
assert mode in ("REQUIRED", "NULLABLE") | |
assert type_ in ( | |
"DATE", | |
"INTEGER", | |
"STRING", | |
"FLOAT", | |
"TIMESTAMP", | |
"BOOLEAN", | |
"TIME", | |
), type_ | |
if type_ in ("STRING", "FLOAT"): | |
downcasted = maybe_downcast_type(fp, name, type_) | |
assert downcasted in ("STRING", "FLOAT", "INTEGER") | |
schema[idx]["type"] = downcasted | |
return schema | |
def maybe_downcast_type(fp, name, current_type): | |
if isinstance(fp, str): | |
fp = massage_filepath(fp) | |
df_ = load_df(StringIO(get_text(fp)), usecols=[name], allstrings=True) | |
assert current_type in ("STRING", "FLOAT"), current_type | |
downcasted_type = "STRING" | |
try: | |
float_df = df_[name].replace("", pd.np.nan).astype(float) | |
downcasted_type = "FLOAT" | |
# noinspection PyUnresolvedReferences | |
float_df.astype(pd.Int64Dtype()) | |
downcasted_type = "INTEGER" | |
except ValueError as e: | |
assert current_type == "STRING", current_type | |
_expexc = EXPECTED_EXC["STRING"] | |
assert _expexc in repr(e), (name, current_type) | |
except TypeError as e: | |
_expexc = EXPECTED_EXC["FLOAT"] | |
assert _expexc in repr(e), (name, current_type) | |
if current_type == "FLOAT": | |
assert downcasted_type in ("FLOAT", "INTEGER") | |
return downcasted_type | |
def get_filepaths() -> List[Path]: | |
return sorted(relative_to_project_root("pdw").glob("*.csv")) | |
def get_tablename(fp: Path) -> str: | |
fp_name_parts = fp.name.split(".") | |
assert len(fp_name_parts) == 3, fp_name_parts | |
return fp_name_parts[1] | |
def test__maybe_downcast_type(): | |
expecteds = [ | |
("DecimalValue", "FLOAT"), | |
("ParentActionID", "INTEGER"), | |
("ItemTypeID", "INTEGER"), | |
("GroupID", "INTEGER"), | |
("TextValue", "STRING"), | |
("Y_LastModifiedUserID", "INTEGER"), | |
("StockDescription", "STRING"), | |
] | |
fp_ = "pdw/dbo.v_ItemActionsDetails.csv" | |
for (colname, expected) in expecteds: | |
act = maybe_downcast_type(fp_, colname, "STRING") | |
assert act == expected, (act, expected) | |
def fix_column_name_for_gbq(name: str) -> str: | |
return BAD_LETTER_PATT.sub("_", name)[:MAX_COLUMN_NAME_LENGTH] | |
@timeit() | |
def fix_column_names_for_gbq(df_: pd.DataFrame) -> pd.DataFrame: | |
return df_.rename(columns=fix_column_name_for_gbq) | |
def enable_verbose_logging(): | |
logger = logging.getLogger("pandas_gbq") | |
logger.setLevel(logging.DEBUG) | |
logger.addHandler(logging.StreamHandler()) | |
@timeit() | |
def send_to_bigquery( | |
df_: pd.DataFrame, | |
verbose: bool, | |
destination_dataset: str, | |
destination_table: str, | |
chunksize=None, | |
location="US", | |
table_schema=None, | |
): | |
df_ = fix_column_names_for_gbq(df_) | |
if verbose: | |
enable_verbose_logging() | |
if table_schema: | |
df_ = ensure_schema_types(df_, table_schema) | |
@timeit() | |
def _to_gbq(): | |
print("Starting to_gbq...") | |
to_gbq( | |
df_, | |
destination_table=".".join((destination_dataset, destination_table)), | |
project_id=PROJECT_ID, | |
chunksize=chunksize, | |
if_exists="replace", | |
location=location, | |
progress_bar=True, | |
credentials=creds(), | |
table_schema=table_schema, | |
) | |
_to_gbq() | |
@timeit() | |
def ensure_schema_types(df_, schema_): | |
for item in schema_: | |
colname = item["name"] | |
coltype = item["type"] | |
if coltype in ("FLOAT", "INTEGER"): | |
df_[colname] = df_[colname].replace("", pd.np.nan).astype(float) | |
if coltype == "INTEGER": | |
# noinspection PyUnresolvedReferences | |
df_[colname] = df_[colname].astype(pd.Int64Dtype()) | |
# Convert to string, with special case: pd.np.nan --> '' | |
df_[colname] = pd.np.where( | |
pd.isnull(df_[colname]), "", df_[colname].astype(str) | |
) | |
return df_ | |
@timeit() | |
def load_into_bigquery(fp): | |
fp = massage_filepath(fp) | |
tablename = get_tablename(fp) | |
theschema = genschema2(fp) | |
print(theschema) | |
send_to_bigquery( | |
load_df(fp, allstrings=True), | |
verbose=True, | |
chunksize=50000, | |
destination_dataset="pdw", | |
destination_table=tablename, | |
table_schema=theschema.to_canonical(), | |
) | |
@timeit() | |
def gbq_fetch_table_schema(tablename): | |
client = bigquery.Client(credentials=creds()) | |
table_id = f"{PROJECT_ID}.{DATASET_ID}.{tablename}" | |
table = client.get_table(table_id) | |
print(f"Got table '{table.project}.{table.dataset_id}.{table.table_id}'") | |
# View table properties | |
print(f"Table schema: {table.schema}") | |
print(f"Table description: {table.description}") | |
print(f"Table has {table.num_rows} rows") | |
return GbqSchema( | |
fields=[ | |
GbqField(name=f.name, type=f.field_type, mode=f.mode) for f in table.schema | |
] | |
) | |
def compare_schemas(s1, s2) -> Dict[str, Tuple[GbqField, GbqField]]: | |
names = set(chain((f.name for f in s1.fields), (f.name for f in s2.fields))) | |
differences = {} | |
for name in names: | |
if s1[name] != s2[name]: | |
print(s1[name], s2[name], sep="\n", end="\n\n") | |
assert name not in differences | |
differences[name] = (s1[name], s2[name]) | |
return differences | |
@timeit() | |
def genschema2_matches_existing_bq_schema(tablename) -> bool: | |
print(f"Running genschema2_matches_existing_bq_schema for tablename={tablename}") | |
try: | |
act_schema = gbq_fetch_table_schema(tablename) | |
except TableNotFound: | |
return False | |
exp_schema = genschema2(massage_filepath(tablename)) | |
diffs = compare_schemas(act_schema, exp_schema) | |
if diffs: | |
return False | |
else: | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment