To quote from psycopg2's documentation:

Warning Never, never, NEVER use Python string concatenation (+) or string parameters interpolation (%) to pass variables to a SQL query string. Not even at gunpoint.

Now, for an upsert operation you can do this:

insert_sql = '''
    INSERT INTO tablename (col1, col2, col3, col4)
    VALUES (%s, %s, %s, %s)
    ON CONFLICT (col1) DO UPDATE SET
    (col2, col3, col4) = (EXCLUDED.col2, EXCLUDED.col3, EXCLUDED.col4);
'''
cur.execute(insert_sql, (val1, val2, val3, val4))

Notice that the parameters for the query are being passed as a tuple to the execute statement (this assures psycopg2 will take care of adapting them to SQL while shielding you from injection attacks).

The EXCLUDED bit allows you to reuse the values without the need to specify them twice in the data parameter.

Answer from Ionut Ticus on Stack Overflow
Top answer
1 of 5
25

To quote from psycopg2's documentation:

Warning Never, never, NEVER use Python string concatenation (+) or string parameters interpolation (%) to pass variables to a SQL query string. Not even at gunpoint.

Now, for an upsert operation you can do this:

insert_sql = '''
    INSERT INTO tablename (col1, col2, col3, col4)
    VALUES (%s, %s, %s, %s)
    ON CONFLICT (col1) DO UPDATE SET
    (col2, col3, col4) = (EXCLUDED.col2, EXCLUDED.col3, EXCLUDED.col4);
'''
cur.execute(insert_sql, (val1, val2, val3, val4))

Notice that the parameters for the query are being passed as a tuple to the execute statement (this assures psycopg2 will take care of adapting them to SQL while shielding you from injection attacks).

The EXCLUDED bit allows you to reuse the values without the need to specify them twice in the data parameter.

2 of 5
8

Using:

INSERT INTO members (member_id, customer_id, subscribed, customer_member_id, phone, cust_atts) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (customer_member_id) DO UPDATE SET (phone) = (EXCLUDED.phone);

I received the following error:

psycopg2.errors.FeatureNotSupported: source for a multiple-column UPDATE item must be a sub-SELECT or ROW() expression
LINE 1: ...ICT (customer_member_id) DO UPDATE SET (phone) = (EXCLUDED.p...

Changing to:

INSERT INTO members (member_id, customer_id, subscribed, customer_member_id, phone, cust_atts) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (customer_member_id) DO UPDATE SET (phone) = ROW(EXCLUDED.phone);

Solved the issue.

🌐
Medium
medium.com › @kennethhughesa › optimization-of-upsert-methods-in-postgresql-python-ac11b8471494
Optimization of Upsert Methods in PostgreSQL/Python | by Kenny Hughes | Medium
June 5, 2022 - At this point I switched to a different method using the psycopg2 extras sub-module and executing the upsert using the extras.extract_values method:
Top answer
1 of 3
2

Here is my code for bulk insert & insert on conflict update query for postgresql from pandas dataframe:

Lets say id is unique key for both postgresql table and pandas df and you want to insert and update based on this id.

import pandas as pd
from sqlalchemy import create_engine, text

engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f""" 
                INSERT INTO schema.table(name, title, id)
                VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
                ON CONFLICT (id)
                DO  UPDATE SET name= excluded.name,
                               title= excluded.title
         """)
engine.execute(query)

Make sure that your df columns must be same order with your table.

2 of 3
0

FYI, this is the solution I am using currently.

It seems to work fine for my purposes. I had to add a line to replace null (NaT) timestamps with None though, because I was getting an error when I was loading each row into the database.

def create_update_query(table):
    """This function creates an upsert query which replaces existing data based on primary key conflicts"""
    columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
    constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
    placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
    updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
    query = f"""INSERT INTO {table} ({columns}) 
                VALUES ({placeholder}) 
                ON CONFLICT ({constraint}) 
                DO UPDATE SET {updates};"""
    query.split()
    query = ' '.join(query.split())
    return query


def load_updates(df, table, connection):
    conn = connection.get_conn()
    cursor = conn.cursor()
    df1 = df.where((pd.notnull(df)), None)
    insert_values = df1.to_dict(orient='records')
    for row in insert_values:
        cursor.execute(create_update_query(table=table), row)
        conn.commit()
    row_count = len(insert_values)
    logging.info(f'Inserted {row_count} rows.')
    cursor.close()
    del cursor
    conn.close()
Top answer
1 of 1
3

You need to use table EXCLUDED instead of value literals in your ON CONFLICT statement:

The SET and WHERE clauses in ON CONFLICT DO UPDATE have access to the existing row using the table's name (or an alias), and to the row proposed for insertion using the special excluded table.

You also don't need to re-set the conflicting values, only the rest.

INSERT INTO table (col1, col2, col3) 
VALUES 
    (value1, value2, value3), 
    (value4, value5, value6)
ON CONFLICT (col1) DO UPDATE 
SET (col2, col3) = (EXCLUDED.col2, EXCLUDED.col3);

For readability, you can format your in-line SQLs if you triple-quote your f-strings. I'm not sure if and which IDEs can detect it's an in-line SQL in Python and switch syntax highlighting, but I find indentation helpful enough.

upsert_statement = f"""
    INSERT INTO table (col1, col2, col3) 
    VALUES 
        ({value1}, {value2}, {value3}), 
        ({value4}, {value5}, {value6})
    ON CONFLICT (col1) DO UPDATE 
    SET (col2, col3) = (EXCLUDED.col2, EXCLUDED.col3)"""

Here's a test at db<>fiddle:

drop table if exists test_70066823 cascade;
create table test_70066823 (
    id integer primary key, 
    text_column_1 text, 
    text_column_2 text);
insert into test_70066823 values
  (1,'first','first')
 ,(2,'second','second') returning *;
id text_column_1 text_column_2
1 first first
2 second second
insert into test_70066823
values  (1, 'third','first'),
        (3, 'fourth','third'),
        (4, 'fifth','fourth'),
        (2, 'sixth','second')
on conflict (id) do update 
set text_column_1=EXCLUDED.text_column_1,
    text_column_2=EXCLUDED.text_column_2
returning *;
id text_column_1 text_column_2
1 third first
3 fourth third
4 fifth fourth
2 sixth second

You can refer to this for improved insert performance. Inserts with a simple string-based execute or execute_many are the top 2 slowest approaches mentioned there.

🌐
Stack Overflow
stackoverflow.com › questions › 45523539 › upsert-multiple-rows-in-postgresql-with-psycopg2-and-errors-logging
python - Upsert multiple rows in PostgreSQL with psycopg2 and errors logging - Stack Overflow
August 5, 2017 - I found post about upserting using one cursor.execute(), like here, but how should I catch errors if using this trick? Or what else should I do to make it work faster? Here is my code: self.connection = psycopg2.connect(self.connection_settings) self.cursor = self.connection.cursor() for record in dbf_file: self.cursor.execute("SAVEPOINT savepoint;") try: self.send_record(record, where_to_save=database) self.count += 1 self.batch_count += 1 if self.batch_count >= BATCH_COUNT_MAX: self.connection.commit() self.cursor.close() self.cursor = self.connection.cursor() self.batch_count = 0 except Exc
🌐
Medium
medium.com › @santhanu › batch-upsert-pyspark-dataframe-into-postgres-tables-with-error-handling-using-psycopg2-and-asyncpg-59f08aa020b0
How to batch upsert PySpark DataFrame into Postgres tables with error handling using psycopg2 and asyncpg | by Santhanu | Medium
March 1, 2022 - Currently, Spark DataFrameWriter does not support any relational database upserts. We can only overwrite or append to an existing table in the database. However, we can use spark foreachPartition in conjunction with python postgres database packages like psycopg2 or asyncpg and upsert data into postgres tables by applying a function to each spark DataFrame partition.
Find elsewhere
🌐
GitHub
gist.github.com › dound › 772171
Python implementation of UPSERT for use with postgresql. · GitHub
January 10, 2011 - Python implementation of UPSERT for use with postgresql. - postgresql_upsert.py
🌐
Stack Overflow
stackoverflow.com › questions › 50492590 › proper-syntax-for-upsert-insert-update-psycopg2
postgresql - Proper syntax for upsert insert update psycopg2 - Stack Overflow
May 23, 2018 - def postgres_update_issuer(conn,cur,issuer_cik,name,ticker,sic): sql = """ INSERT INTO issuer ( cik,issuer_name,trading_symbol,SIC) VALUES (%s,%s,%s,%s) ON CONFLICT (cik) DO UPDATE SET (issuer_name,trading_symbol,SIC ) = (EXCLUDED.issuer_name, EXCLUDED.trading_symbol, EXCLUDED.SIC) ;""" try: # data = (issuer_cik,name,ticker,sic) cur.execute(sql,(issuer_cik,name,ticker,sic) ) return True except (Exception, psycopg2.DatabaseError) as error: print(error)
🌐
Stack Exchange
dba.stackexchange.com › questions › 324394 › postgresql-use-upsert-with-on-conflict-or-with-separate-insert-and-update-state
PostgreSQL use upsert with ON CONFLICT, or with separate INSERT and UPDATE statements? - Database Administrators Stack Exchange
Currently, our insert query (using the Python library psycopg2 execute_values method) is as follows: INSERT INTO products (column1, column2, internal_id) VALUES %S ON CONFLICT (internal_id) DO UPDATE SET column1 = EXCLUDED.column1, column2 = EXCLUDED.column2 · The basic process is therefore: Fetch millions of products · Upsert into the database ·
🌐
TutorialsPoint
tutorialspoint.com › python_data_access › python_postgresql_update_table.htm
Python PostgreSQL - Update Table
import psycopg2 #establishing the connection conn = psycopg2.connect( database="mydb", user='postgres', password='password', host='127.0.0.1', port= '5432' ) #Setting auto commit false conn.autocommit = True #Creating a cursor object using the cursor() method cursor = conn.cursor() #Fetching all the rows before the update print("Contents of the Employee table: ") sql = '''SELECT * from EMPLOYEE''' cursor.execute(sql) print(cursor.fetchall()) #Updating the records sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = 'M'" cursor.execute(sql) print("Table updated......
🌐
GeeksforGeeks
geeksforgeeks.org › python › bulk-update-of-rows-in-postgres-db-using-psycopg2
Bulk update of rows in Postgres DB using psycopg2 - GeeksforGeeks
December 4, 2022 - #Import the required psycopg2 library import psycopg2 #Method to create a connection object to the database. #It creates a pointer cursor to the database and returns it along with Connection object def create_connection(): #Connect to the Postgresql database using the psycopg2 adapter.
🌐
Stack Overflow
stackoverflow.com › questions › 64727685 › which-is-preferable-option-while-performing-data-upsert-using-python-psycopg2-to
cursor - Which is Preferable option while performing data upsert using python psycopg2 to postgres? - Stack Overflow
from sqlalchemy import Table from sqlalchemy.engine.base import Engine as sql_engine from sqlalchemy.dialects.postgresql import insert from sqlalchemy.ext.automap import automap_base import pandas as pd from sqlalchemy import create_engine from typing import List, Dict engine = create_engine(...) def upsert_database(list_input: List[Dict], engine: sql_engine, table: str, schema: str) -> None: if len(list_input) == 0: return None with engine.connect() as conn: base = automap_base() base.prepare(engine, reflect=True, schema=schema) target_table = Table(table, base.metadata, autoload=True, autolo
🌐
Billyfung
billyfung.com › posts › 2017-06-30-psycopg2-multiple-insert
Improving Multiple Inserts with Psycopg2 - Billy Fung
Up until recently, I haven't had to deal with large enough datasets, so my poorly written code was still acceptable in terms of execution time. Usually this means executing the insert script locally on a test database first, and then on production. I'm using Python 3 and the Psycopg2 postgres ...
🌐
Naysan
naysan.ca › 2020 › 05 › 09 › pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark
Pandas to PostgreSQL using Psycopg2: Bulk Insert Performance Benchmark | Naysan Saran
If you have ever tried to insert a relatively large dataframe into a PostgreSQL table, you know that single inserts are to be avoided at all costs because of how long they take to execute. There are multiple ways to do bulk inserts with Psycopg2 (see this Stack Overflow page and this blog post ...
Top answer
1 of 1
8

More searching and trying things and this answer on Stack Overflow got me where I needed with using 'excluded'.

Code, reworked and working:

for zip_file in os.listdir():
    if zipfile.is_zipfile(zip_file):
        logger.info("Processing %s", zip_file)
        for member in zipfile.ZipFile(zip_file).namelist():
            now = dt.datetime.now()
            prices_list = []
            zipfile.ZipFile(zip_file).extract(member, path=temp)
            local_xml = os.path.join(temp, member)
            tree = ET.parse(local_xml)
            root = tree.getroot()
            ns = root.tag[:-4]
            for finhist in root.findall("./%sFinancialHistory" % ns):
                asset_id = int(finhist.get("Id"))
                logger.debug("Processing %s", asset_id)
                for prices in finhist.findall("./%sPrices" % ns):
                    price_currency = prices.get("Currency")
                    for priceset in prices.findall("./%sPriceSet" % ns):
                        price_date = priceset.get("Date")
                        for price in priceset.findall("./%sPrice" % ns):
                            price_value = float(price.text)
                            price_type = price.get("Type")
                            prices_list.append((asset_id, price_date, price_type, price_value, price_currency, now, zip_file))
            try:
                os.remove(local_xml)
            except Exception:
                logger.error("File cannot be deleted", exc_info=True)
            cur = conn.cursor()
            try:
                execute_values(cur, 'INSERT INTO "LTSF"."Prices"("Asset_ID", "Price_Date", "Price_Type", "Price_Value", "Price_Currency", "Mod_Date", "Zip_File")   \
                           VALUES %s \
                           ON CONFLICT ("Asset_ID", "Price_Date", "Price_Type") \
                           DO UPDATE SET \
                           "Price_Value"=excluded."Price_Value", "Price_Currency"=excluded."Price_Currency", "Mod_Date"=excluded."Mod_Date", "Zip_File"=excluded."Zip_File"', prices_list)
            except Exception:
                logger.error("Problem upserting Prices", exc_info=True)
            conn.commit()
            cur.close()

Thank you "a_horse_with_no_name"!