To quote from psycopg2's documentation:
Warning Never, never, NEVER use Python string concatenation (+) or string parameters interpolation (%) to pass variables to a SQL query string. Not even at gunpoint.
Now, for an upsert operation you can do this:
insert_sql = '''
INSERT INTO tablename (col1, col2, col3, col4)
VALUES (%s, %s, %s, %s)
ON CONFLICT (col1) DO UPDATE SET
(col2, col3, col4) = (EXCLUDED.col2, EXCLUDED.col3, EXCLUDED.col4);
'''
cur.execute(insert_sql, (val1, val2, val3, val4))
Notice that the parameters for the query are being passed as a tuple to the execute statement (this assures psycopg2 will take care of adapting them to SQL while shielding you from injection attacks).
The EXCLUDED bit allows you to reuse the values without the need to specify them twice in the data parameter.
To quote from psycopg2's documentation:
Warning Never, never, NEVER use Python string concatenation (+) or string parameters interpolation (%) to pass variables to a SQL query string. Not even at gunpoint.
Now, for an upsert operation you can do this:
insert_sql = '''
INSERT INTO tablename (col1, col2, col3, col4)
VALUES (%s, %s, %s, %s)
ON CONFLICT (col1) DO UPDATE SET
(col2, col3, col4) = (EXCLUDED.col2, EXCLUDED.col3, EXCLUDED.col4);
'''
cur.execute(insert_sql, (val1, val2, val3, val4))
Notice that the parameters for the query are being passed as a tuple to the execute statement (this assures psycopg2 will take care of adapting them to SQL while shielding you from injection attacks).
The EXCLUDED bit allows you to reuse the values without the need to specify them twice in the data parameter.
Using:
INSERT INTO members (member_id, customer_id, subscribed, customer_member_id, phone, cust_atts) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (customer_member_id) DO UPDATE SET (phone) = (EXCLUDED.phone);
I received the following error:
psycopg2.errors.FeatureNotSupported: source for a multiple-column UPDATE item must be a sub-SELECT or ROW() expression
LINE 1: ...ICT (customer_member_id) DO UPDATE SET (phone) = (EXCLUDED.p...
Changing to:
INSERT INTO members (member_id, customer_id, subscribed, customer_member_id, phone, cust_atts) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (customer_member_id) DO UPDATE SET (phone) = ROW(EXCLUDED.phone);
Solved the issue.
Here is my code for bulk insert & insert on conflict update query for postgresql from pandas dataframe:
Lets say id is unique key for both postgresql table and pandas df and you want to insert and update based on this id.
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f"""
INSERT INTO schema.table(name, title, id)
VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""")
engine.execute(query)
Make sure that your df columns must be same order with your table.
FYI, this is the solution I am using currently.
It seems to work fine for my purposes. I had to add a line to replace null (NaT) timestamps with None though, because I was getting an error when I was loading each row into the database.
def create_update_query(table):
"""This function creates an upsert query which replaces existing data based on primary key conflicts"""
columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
query = f"""INSERT INTO {table} ({columns})
VALUES ({placeholder})
ON CONFLICT ({constraint})
DO UPDATE SET {updates};"""
query.split()
query = ' '.join(query.split())
return query
def load_updates(df, table, connection):
conn = connection.get_conn()
cursor = conn.cursor()
df1 = df.where((pd.notnull(df)), None)
insert_values = df1.to_dict(orient='records')
for row in insert_values:
cursor.execute(create_update_query(table=table), row)
conn.commit()
row_count = len(insert_values)
logging.info(f'Inserted {row_count} rows.')
cursor.close()
del cursor
conn.close()
Yeah, I would vote for COPY, providing you can write a file to the server's hard drive (not the drive the app is running on) as COPY will only read off the server.
There is a new psycopg2 manual containing examples for all the options.
The COPY option is the most efficient. Then the executemany. Then the execute with pyformat.