Last active
September 29, 2023 08:55
-
-
Save Havoc24k/d646eb54925e8777a01c45b33e4a0f68 to your computer and use it in GitHub Desktop.
Import tables from one PostgreSQL database to another using the INFORMATION_SCHEMA
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3.6 | |
"""Sync PostgreSQL.""" | |
import psycopg2 | |
import sys | |
from psycopg2.extras import RealDictCursor, execute_values | |
""" | |
Usage: | |
sync.py | |
Options: | |
-f Force import tables without asking for user input per table | |
""" | |
""" | |
Configure the info for connecting to both databases | |
""" | |
SOURCE_USERNAME = '' | |
SOURCE_PASSWD = '' | |
SOURCE_URL = '' | |
SOURCE_PORT = 5432 | |
SOURCE_DB = '' | |
DEST_USERNAME = '' | |
DEST_PASSWD = '' | |
DEST_URL = '' | |
DEST_PORT = 5432 | |
DEST_DB = '' | |
""" | |
Add the tables to import | |
if no schema_name is provided it defaults to 'public' | |
""" | |
list_of_tables = [ | |
'schema_name.table_name' | |
] | |
def format_drop_create_query(table_name, columns, table_schema='public'): | |
"""format_drop_create_query.""" | |
""" | |
We create the DROP query portion | |
""" | |
q_drop = 'DROP TABLE IF EXISTS "{}"."{}";'.format(table_schema, table_name) | |
""" | |
For each column we get the: | |
- column name | |
- column type | |
- column character maximum length for varchar values | |
- indication if it allows NUll or not | |
and we add the column part to the CREATE query | |
""" | |
columns_string = "" | |
i = 1 | |
for c_data in columns: | |
if c_data['data_type'] == 'character varying': | |
data_type = 'varchar({})'.format( | |
c_data['character_maximum_length']) | |
else: | |
data_type = c_data['data_type'] | |
is_nullable_str = '' | |
if c_data['is_nullable'] == 'NO': | |
is_nullable_str = 'NOT NULL' | |
column_string = '"{}" {} {}'.format( | |
c_data['column_name'], data_type, is_nullable_str) | |
if i < len(columns): | |
column_string += ',' | |
column_string += '\n' | |
columns_string += column_string | |
i += 1 | |
q_create_body = '''CREATE TABLE "{}"."{}" ( | |
{} | |
); | |
'''.format(table_schema, table_name, columns_string) | |
""" | |
We combine the two parts and return it | |
""" | |
return q_drop + '\n' + q_create_body | |
if __name__ == '__main__': | |
try: | |
""" | |
We check if the user set the -f arg or not and set the correct | |
value to the 'prompt' flag | |
""" | |
prompt = True | |
if len(sys.argv) > 1 and sys.argv[1] == '-f': | |
prompt = False | |
""" | |
This is the query that we will run to get | |
the data from the INFORMATION_SCHEMA | |
""" | |
get_table_struct_query = ''' | |
SELECT | |
table_schema, | |
column_name, | |
data_type, | |
character_maximum_length, | |
is_nullable | |
FROM | |
INFORMATION_SCHEMA.COLUMNS | |
WHERE | |
table_name = '{}' | |
AND table_schema = '{}' | |
ORDER BY ordinal_position | |
''' | |
""" | |
Create connection to the source database | |
and initialize a cursor | |
""" | |
src = psycopg2.connect( | |
database=SOURCE_DB, | |
user=SOURCE_USERNAME, | |
password=SOURCE_PASSWD, | |
host=SOURCE_URL, | |
port=SOURCE_PORT, | |
cursor_factory=RealDictCursor | |
) | |
src_cur = src.cursor() | |
""" | |
Create connection to the destination database | |
and initialize a cursor | |
""" | |
dest = psycopg2.connect( | |
database=DEST_DB, | |
user=DEST_USERNAME, | |
password=DEST_PASSWD, | |
host=DEST_URL, | |
port=DEST_PORT, | |
cursor_factory=RealDictCursor | |
) | |
dest_cur = dest.cursor() | |
print('================================\n') | |
""" | |
Iterate the list of tables we want to import | |
""" | |
for table_name in list_of_tables: | |
print('Importing {} \nfrom: [SOURCE] {} \nto: [DEST] {}\n'.format( | |
table_name, src.dsn, dest.dsn)) | |
""" | |
Check if the -f flag was provided | |
If not we ask the user to verify the import process for each table | |
""" | |
if prompt: | |
user_resp = input('Do you want to continue? [N/y] ') | |
if user_resp != 'y' and user_resp != 'Y': | |
print('Skiping table {}'.format(table_name)) | |
continue | |
print('Starting import...\n') | |
print('Getting table data from SOURCE...', '\n') | |
""" | |
Separate the table name and the schema name from the | |
combined table name in the list. | |
If the name did not contain a schema we default to `public` | |
""" | |
table_name_info = table_name.split('.') | |
if len(table_name_info) == 2: | |
table_schema = table_name_info[0] | |
table_name = table_name_info[1] | |
else: | |
table_schema = 'public' | |
""" | |
Execute the query in the source database that will | |
get all the data of the table | |
""" | |
q_get = 'SELECT * FROM "{}"."{}";'.format(table_schema, table_name) | |
src_cur.execute(q_get) | |
table_data = src_cur.fetchall() | |
""" | |
Execute the query in the source database that will get the | |
column data of the table from the INFORMATION_SCHEMA | |
""" | |
print('Create table to DEST...', '\n') | |
src_cur.execute(get_table_struct_query.format(table_name, table_schema)) | |
""" | |
Use the reponse from the previous query and | |
execute the DROP...CREATE... query to the destination database | |
""" | |
dest_cur.execute(format_drop_create_query( | |
table_name, columns=src_cur.fetchall())) | |
if dest.notices: | |
print(dest.notices, '\n') | |
""" | |
After the table is created to the destination database | |
we execute the query that inserts tha data. | |
""" | |
print('Insert data to DEST...', '\n') | |
column_names = ",".join(list(table_data[0].keys())) | |
q_insert = 'INSERT INTO "{}"."{}" ({}) VALUES %s'.format( | |
'public', table_name, column_names) | |
execute_values( | |
dest_cur, | |
q_insert, | |
argslist=[list(v.values()) for v in table_data], | |
template=None, | |
page_size=100 | |
) | |
""" | |
We commit everything and inform the user | |
""" | |
dest.commit() | |
print('...finished import\n') | |
print('================================\n') | |
""" | |
We close the connections to the databases | |
""" | |
src.close() | |
dest.close() | |
except psycopg2.Error as e: | |
""" | |
In case of error we rollback all actions to the destination database | |
and close all connections | |
""" | |
dest.rollback() | |
src.close() | |
dest.close() | |
print('An error occured, ALL actions have been rollbacked.', '\n') | |
print(e, '\n') | |
print('================================\n') |
An error occured, ALL actions have been rollbacked.
syntax error at or near "None"
LINE 20: "remarks" varchar(None)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Love it, thanks a lot! :D