Created
May 18, 2021 19:54
-
-
Save tombohub/0c666583c48c1686c736ae2eb76cb2ea to your computer and use it in GitHub Desktop.
How to save pandas dataframe to MySQL with INSERT IGNORE
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This example is for inside the class. | |
First establish the connection into `self.conn` | |
""" | |
def _table_column_names(self, table: str) -> str: | |
""" | |
Get column names from database table | |
Parameters | |
---------- | |
table : str | |
name of the table | |
Returns | |
------- | |
str | |
names of columns as a string so we can interpolate into the SQL queries | |
""" | |
query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}'" | |
rows = self.conn.execute(query) | |
dirty_names = [i[0] for i in rows] | |
clean_names = '`' + '`, `'.join(map(str, dirty_names)) + '`' | |
return clean_names | |
def _insert_conflict_ignore(self, df: pd.DataFrame, table: str, index: bool): | |
""" | |
Saves dataframe to the MySQL database with 'INSERT IGNORE' query. | |
First it uses pandas.to_sql to save to temporary table. | |
After that it uses SQL to transfer the data to destination table, matching the columns. | |
Destination table needs to exist already. | |
Final step is deleting the temporary table. | |
Parameters | |
---------- | |
df : pd.DataFrame | |
dataframe to save | |
table : str | |
destination table name | |
""" | |
# generate random table name for concurrent writing | |
temp_table = ''.join(random.choice(string.ascii_letters) for i in range(10)) | |
try: | |
df.to_sql(temp_table, self.conn, index=index) | |
columns = self._table_column_names(table=temp_table) | |
insert_query = f'INSERT IGNORE INTO {table}({columns}) SELECT {columns} FROM `{temp_table}`' | |
self.conn.execute(insert_query) | |
except Exception as e: | |
print(e) | |
# drop temp table | |
drop_query = f'DROP TABLE IF EXISTS `{temp_table}`' | |
self.conn.execute(drop_query) | |
def save_dataframe(self, df: pd.DataFrame, table: str): | |
''' | |
Save dataframe to the database. | |
Index is saved if it has name. If it's None it will not be saved. | |
It implements INSERT IGNORE when inserting rows into the MySQL table. | |
Table needs to exist before. | |
Arguments: | |
df {pd.DataFrame} -- dataframe to save | |
table {str} -- name of the db table | |
''' | |
if df.index.name is None: | |
save_index = False | |
else: | |
save_index = True | |
self._insert_conflict_ignore(df=df, table=table, index=save_index) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment