Skip to content

Instantly share code, notes, and snippets.

@parrotmac
Created June 26, 2024 01:40
Show Gist options
  • Save parrotmac/dbd87b3826914e7647fcf36d0caa4436 to your computer and use it in GitHub Desktop.
Save parrotmac/dbd87b3826914e7647fcf36d0caa4436 to your computer and use it in GitHub Desktop.
Python Script to generate pggen-compatible queries from a PostgreSQL schema scraped using PSQL
#!/usr/bin/env python
import re
import subprocess
from textwrap import dedent
from pathlib import Path
def inspect_table(table_name: str):
output = subprocess.check_output([
"sh", "-c", f"psql 'postgres://postgres:postgres@localhost:54339/postgres' -c '\\d {table_name}'"
])
# output might look like this:
"""
Table "public.project"
Column | Type | Collation | Nullable | Default
-------------+-----------------------------+-----------+----------+--------------------
id | uuid | | not null | uuid_generate_v4()
created_at | timestamp without time zone | | not null | now()
updated_at | timestamp without time zone | | not null | now()
name | text | | not null |
owner_id | uuid | | not null |
beta_config | jsonb | | |
Indexes:
"project_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
"project_owner_id_fkey" FOREIGN KEY (owner_id) REFERENCES organization(id)
Referenced by:
TABLE "project_config" CONSTRAINT "project_config_project_id_fkey" FOREIGN KEY (project_id) REFERENCES project(id)
TABLE "project_github_repository" CONSTRAINT "project_github_repository_project_id_fkey" FOREIGN KEY (project_id) REFERENCES project(id)
TABLE "project_host" CONSTRAINT "project_host_project_id_fkey" FOREIGN KEY (project_id) REFERENCES project(id)
"""
# We want to extract the column names and types
# We also want to extract the foreign key constraints
columns = []
foreign_keys = []
current_section = None
for line in output.splitlines():
line = line.decode("utf-8").strip()
if line.startswith("Column"):
current_section = "columns"
continue
if line.startswith("Indexes:"):
current_section = "indexes"
continue
if line.startswith("Foreign-key constraints:"):
current_section = "foreign_keys"
continue
if current_section == "columns":
if line.startswith("--------"):
continue
column_name, column_type, collation, nullable, default = line.split("|")
column_name = column_name.strip()
column_type = column_type.strip()
column_default = default.strip()
columns.append({
"name": column_name,
"type": column_type,
"default": column_default,
})
elif current_section == "foreign_keys":
if "FOREIGN KEY" in line:
pattern = re.compile(r"\"(\w+)\" FOREIGN KEY \((\w+)\) REFERENCES (\w+)\((\w+)\)")
m = pattern.match(line)
if m:
foreign_keys.append({
"constraint_name": m.group(1),
"column": m.group(2),
"references_table": m.group(3),
"references_column": m.group(4),
})
return {
"columns": columns,
"foreign_keys": foreign_keys,
}
camel_case = lambda s: "".join([w.capitalize() for w in s.split("_")])
def generate_object_id_lookup_with_profile_constraint(table_name: str, relation_name: str):
return dedent(f"""
-- Get{camel_case(table_name)}ConstrainedByGithubUser finds one or more {table_name} records by ID while also checking that the user has the given roles
-- name: Get{camel_case(table_name)}ConstrainedByGithubUser :one
SELECT * FROM {table_name}
WHERE
id = pggen.arg('id')
AND
{relation_name} = (
SELECT o.id FROM organization as o
LEFT JOIN organization_member om on o.id = om.organization_id
LEFT JOIN profile p on om.profile_id = p.id
WHERE
p.github_username = pggen.arg('github_username')
AND
om.roles && pggen.arg('roles')
);
""")
def generate_object_filter_with_profile_constraint(table_name: str, relation_name: str):
return dedent(f"""
-- List{camel_case(table_name)}ConstrainedByGithubUser finds one or more {table_name} while checking that the user has the given roles
-- name: List{camel_case(table_name)}ConstrainedByGithubUser :many
SELECT * FROM {table_name}
WHERE
{relation_name} = (
SELECT o.id FROM organization as o
LEFT JOIN organization_member om on o.id = om.organization_id
LEFT JOIN profile p on om.profile_id = p.id
WHERE
p.github_username = pggen.arg('github_username')
AND
om.roles && pggen.arg('roles')
);
""")
def generate_create_statements(table_name: str, columns: list):
return dedent(f"""
-- Create{camel_case(table_name)} creates a new {table_name} record
-- name: CreateFull{camel_case(table_name)} :one
INSERT INTO {table_name} ({", ".join([c["name"] for c in columns])})
VALUES ({", ".join([f"pggen.arg('{c['name']}')" for c in columns])})
RETURNING *;
""")
def generate_create_without_defaults_statements(table_name: str, columns: list):
filtered_columns = []
for c in columns:
if c["name"] == "id":
continue
if c["name"] == "created_at":
continue
if c["name"] == "updated_at":
continue
if c["default"] == "":
filtered_columns.append(c)
return dedent(f"""
-- Create{camel_case(table_name)} creates a new {table_name} record
-- name: Create{camel_case(table_name)} :one
INSERT INTO {table_name} ({", ".join([c["name"] for c in filtered_columns])})
VALUES ({", ".join([f"pggen.arg('{c['name']}')" for c in filtered_columns])})
RETURNING *;
""")
def generate_update_statements(table_name: str, columns: list):
return dedent(f"""
-- UpdateFull{camel_case(table_name)} updates a {table_name} record
-- name: Update{camel_case(table_name)} :one
UPDATE {table_name}
SET {", ".join([f"{c['name']} = pggen.arg('{c['name']}')" for c in columns])}
WHERE id = pggen.arg('id')
RETURNING *;
""")
def generate_delete_statements(table_name: str):
return dedent(f"""
-- Delete{camel_case(table_name)}ByID deletes a {table_name} record
-- name: Delete{camel_case(table_name)} :one
DELETE FROM {table_name}
WHERE id = pggen.arg('id')
RETURNING *;
""")
output = subprocess.check_output([
# "sh", "-c", "grep -Eio 'CREATE TABLE (\w+)' models/schema.sql | awk '{ print $3 }'"
"sh", "-c", "psql 'postgres://postgres:postgres@localhost:54339/postgres' -c '\d' | grep -E '^\s+public\s+\|\s+' | awk '{ print $3 }'"
])
tables = [ l.decode("utf-8") for l in output.splitlines() ]
for table in tables:
print(f"Generating for table {table}")
file_buffer = """
-- Auto-generated by pregen.py script
-- DO NOT EDIT
-- Path: models/pregen.py
"""
for table in tables:
upper_table = table.capitalize()
file_buffer += dedent(f"""
-- Find{upper_table}ByID finds one or more {table} records by ID
-- name: Find{upper_table}ByID :one
SELECT * FROM {table} WHERE id = pggen.arg('id');
""")
structure = inspect_table(table)
columns = structure["columns"]
foreign_keys = structure["foreign_keys"]
for foreign_key in foreign_keys:
if foreign_key["references_table"] == "organization" and foreign_key["references_column"] == "id":
file_buffer += generate_object_id_lookup_with_profile_constraint(table, foreign_key["column"])
file_buffer += generate_object_filter_with_profile_constraint(table, foreign_key["column"])
file_buffer += generate_create_statements(table, columns)
file_buffer += generate_create_without_defaults_statements(table, columns)
file_buffer += generate_update_statements(table, columns)
file_buffer += generate_delete_statements(table)
Path("models/queries-generated.sql").write_text(file_buffer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment