Created
June 26, 2024 01:40
-
-
Save parrotmac/dbd87b3826914e7647fcf36d0caa4436 to your computer and use it in GitHub Desktop.
Python Script to generate pggen-compatible queries from a PostgreSQL schema scraped using PSQL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import re | |
import subprocess | |
from textwrap import dedent | |
from pathlib import Path | |
def inspect_table(table_name: str): | |
output = subprocess.check_output([ | |
"sh", "-c", f"psql 'postgres://postgres:postgres@localhost:54339/postgres' -c '\\d {table_name}'" | |
]) | |
# output might look like this: | |
""" | |
Table "public.project" | |
Column | Type | Collation | Nullable | Default | |
-------------+-----------------------------+-----------+----------+-------------------- | |
id | uuid | | not null | uuid_generate_v4() | |
created_at | timestamp without time zone | | not null | now() | |
updated_at | timestamp without time zone | | not null | now() | |
name | text | | not null | | |
owner_id | uuid | | not null | | |
beta_config | jsonb | | | | |
Indexes: | |
"project_pkey" PRIMARY KEY, btree (id) | |
Foreign-key constraints: | |
"project_owner_id_fkey" FOREIGN KEY (owner_id) REFERENCES organization(id) | |
Referenced by: | |
TABLE "project_config" CONSTRAINT "project_config_project_id_fkey" FOREIGN KEY (project_id) REFERENCES project(id) | |
TABLE "project_github_repository" CONSTRAINT "project_github_repository_project_id_fkey" FOREIGN KEY (project_id) REFERENCES project(id) | |
TABLE "project_host" CONSTRAINT "project_host_project_id_fkey" FOREIGN KEY (project_id) REFERENCES project(id) | |
""" | |
# We want to extract the column names and types | |
# We also want to extract the foreign key constraints | |
columns = [] | |
foreign_keys = [] | |
current_section = None | |
for line in output.splitlines(): | |
line = line.decode("utf-8").strip() | |
if line.startswith("Column"): | |
current_section = "columns" | |
continue | |
if line.startswith("Indexes:"): | |
current_section = "indexes" | |
continue | |
if line.startswith("Foreign-key constraints:"): | |
current_section = "foreign_keys" | |
continue | |
if current_section == "columns": | |
if line.startswith("--------"): | |
continue | |
column_name, column_type, collation, nullable, default = line.split("|") | |
column_name = column_name.strip() | |
column_type = column_type.strip() | |
column_default = default.strip() | |
columns.append({ | |
"name": column_name, | |
"type": column_type, | |
"default": column_default, | |
}) | |
elif current_section == "foreign_keys": | |
if "FOREIGN KEY" in line: | |
pattern = re.compile(r"\"(\w+)\" FOREIGN KEY \((\w+)\) REFERENCES (\w+)\((\w+)\)") | |
m = pattern.match(line) | |
if m: | |
foreign_keys.append({ | |
"constraint_name": m.group(1), | |
"column": m.group(2), | |
"references_table": m.group(3), | |
"references_column": m.group(4), | |
}) | |
return { | |
"columns": columns, | |
"foreign_keys": foreign_keys, | |
} | |
camel_case = lambda s: "".join([w.capitalize() for w in s.split("_")]) | |
def generate_object_id_lookup_with_profile_constraint(table_name: str, relation_name: str): | |
return dedent(f""" | |
-- Get{camel_case(table_name)}ConstrainedByGithubUser finds one or more {table_name} records by ID while also checking that the user has the given roles | |
-- name: Get{camel_case(table_name)}ConstrainedByGithubUser :one | |
SELECT * FROM {table_name} | |
WHERE | |
id = pggen.arg('id') | |
AND | |
{relation_name} = ( | |
SELECT o.id FROM organization as o | |
LEFT JOIN organization_member om on o.id = om.organization_id | |
LEFT JOIN profile p on om.profile_id = p.id | |
WHERE | |
p.github_username = pggen.arg('github_username') | |
AND | |
om.roles && pggen.arg('roles') | |
); | |
""") | |
def generate_object_filter_with_profile_constraint(table_name: str, relation_name: str): | |
return dedent(f""" | |
-- List{camel_case(table_name)}ConstrainedByGithubUser finds one or more {table_name} while checking that the user has the given roles | |
-- name: List{camel_case(table_name)}ConstrainedByGithubUser :many | |
SELECT * FROM {table_name} | |
WHERE | |
{relation_name} = ( | |
SELECT o.id FROM organization as o | |
LEFT JOIN organization_member om on o.id = om.organization_id | |
LEFT JOIN profile p on om.profile_id = p.id | |
WHERE | |
p.github_username = pggen.arg('github_username') | |
AND | |
om.roles && pggen.arg('roles') | |
); | |
""") | |
def generate_create_statements(table_name: str, columns: list): | |
return dedent(f""" | |
-- Create{camel_case(table_name)} creates a new {table_name} record | |
-- name: CreateFull{camel_case(table_name)} :one | |
INSERT INTO {table_name} ({", ".join([c["name"] for c in columns])}) | |
VALUES ({", ".join([f"pggen.arg('{c['name']}')" for c in columns])}) | |
RETURNING *; | |
""") | |
def generate_create_without_defaults_statements(table_name: str, columns: list): | |
filtered_columns = [] | |
for c in columns: | |
if c["name"] == "id": | |
continue | |
if c["name"] == "created_at": | |
continue | |
if c["name"] == "updated_at": | |
continue | |
if c["default"] == "": | |
filtered_columns.append(c) | |
return dedent(f""" | |
-- Create{camel_case(table_name)} creates a new {table_name} record | |
-- name: Create{camel_case(table_name)} :one | |
INSERT INTO {table_name} ({", ".join([c["name"] for c in filtered_columns])}) | |
VALUES ({", ".join([f"pggen.arg('{c['name']}')" for c in filtered_columns])}) | |
RETURNING *; | |
""") | |
def generate_update_statements(table_name: str, columns: list): | |
return dedent(f""" | |
-- UpdateFull{camel_case(table_name)} updates a {table_name} record | |
-- name: Update{camel_case(table_name)} :one | |
UPDATE {table_name} | |
SET {", ".join([f"{c['name']} = pggen.arg('{c['name']}')" for c in columns])} | |
WHERE id = pggen.arg('id') | |
RETURNING *; | |
""") | |
def generate_delete_statements(table_name: str): | |
return dedent(f""" | |
-- Delete{camel_case(table_name)}ByID deletes a {table_name} record | |
-- name: Delete{camel_case(table_name)} :one | |
DELETE FROM {table_name} | |
WHERE id = pggen.arg('id') | |
RETURNING *; | |
""") | |
output = subprocess.check_output([ | |
# "sh", "-c", "grep -Eio 'CREATE TABLE (\w+)' models/schema.sql | awk '{ print $3 }'" | |
"sh", "-c", "psql 'postgres://postgres:postgres@localhost:54339/postgres' -c '\d' | grep -E '^\s+public\s+\|\s+' | awk '{ print $3 }'" | |
]) | |
tables = [ l.decode("utf-8") for l in output.splitlines() ] | |
for table in tables: | |
print(f"Generating for table {table}") | |
file_buffer = """ | |
-- Auto-generated by pregen.py script | |
-- DO NOT EDIT | |
-- Path: models/pregen.py | |
""" | |
for table in tables: | |
upper_table = table.capitalize() | |
file_buffer += dedent(f""" | |
-- Find{upper_table}ByID finds one or more {table} records by ID | |
-- name: Find{upper_table}ByID :one | |
SELECT * FROM {table} WHERE id = pggen.arg('id'); | |
""") | |
structure = inspect_table(table) | |
columns = structure["columns"] | |
foreign_keys = structure["foreign_keys"] | |
for foreign_key in foreign_keys: | |
if foreign_key["references_table"] == "organization" and foreign_key["references_column"] == "id": | |
file_buffer += generate_object_id_lookup_with_profile_constraint(table, foreign_key["column"]) | |
file_buffer += generate_object_filter_with_profile_constraint(table, foreign_key["column"]) | |
file_buffer += generate_create_statements(table, columns) | |
file_buffer += generate_create_without_defaults_statements(table, columns) | |
file_buffer += generate_update_statements(table, columns) | |
file_buffer += generate_delete_statements(table) | |
Path("models/queries-generated.sql").write_text(file_buffer) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment