from https://stackoverflow.com/a/26018934/465974

After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".

The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:

Answer from Shriram on Stack Overflow
🌐
GitHub
github.com › sqlalchemy › sqlalchemy › discussions › 11494
What's the best way to perform bulk upserts? · sqlalchemy/sqlalchemy · Discussion #11494
June 14, 2024 - IMO, "the best way to perform bulk upserts" is to upload the source data to a temporary table, and then run the necessary DML statement(s) on the server, e.g.,
Author   sqlalchemy
Discussions

Bulk upsert in Mysql dialect using on_duplicate_key_update
I've studied the doc and SO posts for clues on bulk upsert using the MySQL dialect. I have not yet figured it out, and could not find any answer on SO that explains it either. I hope I'm not missing something totally obvious. Please give more hints, tips or suggestions here. Beta Was this translation helpful? Give feedback. ... There was an error while loading. Please reload this page. Something went wrong. There was an error while loading. Please reload this page. ... import sqlalchemy... More on github.com
🌐 github.com
4
2
February 22, 2023
How to bulk update 400.000 database entry's with sqlalchemy

First, I’d optimise the query. Right now you’re getting a lot of data and filter it in Python. Try to move that to the db side.

Then, you’re iterating over all the results twice, first you’re building db_keywords, then update_data. Another thing to think about it that if you’re bulk updating 400.000 entries they’ll all be pending until you commit, so I’d flush and commit every few thousand entries or such. All this can be done within a single for loop.

db_keywords = Keyword.query.filter_by(marketplace_id=marketplace_id).filter(Keyword.name.in_(csv_keywords)).all()

update_data = []
for i, keyword in enumerate(db_keywords):
    update_data.append({"id": keyword.id, "rank": keyword.rank})
    if not i % 2000:
        db.session.bulk_update_mappings(update_data)
        db.session.commit()
        update_data = []
db.session.commit()

If this is still too slow, you can omit the querying step completely by using SQLAlchemy core and building your update query manually. Here is the relevant documentation.

More on reddit.com
🌐 r/flask
10
15
February 8, 2021
python - SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql - Stack Overflow
I am trying to write a bulk upsert in python using the SQLAlchemy module (not in SQL!). I am getting the following error on a SQLAlchemy add: sqlalchemy.exc.IntegrityError: (IntegrityError) dupli... More on stackoverflow.com
🌐 stackoverflow.com
Bulk upsert in SQLite dialect using INSERT…ON CONFLICT fails on Nullable column
Please help me figure out a subtle problem with bulk upsert in SQLAlchemy and SQLite with an optional column. I've been following the documentation here: https://docs.sqlalchemy.org/en/20/diale... More on github.com
🌐 github.com
3
1
🌐
SQLAlchemy
docs.sqlalchemy.org › en › 20 › orm › queryguide › dml.html
ORM-Enabled INSERT, UPDATE, and DELETE statements — SQLAlchemy 2.0 Documentation
From the SQLAlchemy ORM’s point of view, upsert statements look like regular Insert constructs, which includes that Insert.returning() works with upsert statements in the same way as was demonstrated at ORM Bulk Insert with Per Row SQL Expressions, so that any column expression or relevant ORM entity class may be passed.
🌐
GitHub
github.com › sqlalchemy › sqlalchemy › discussions › 9328
Bulk upsert in Mysql dialect using on_duplicate_key_update · sqlalchemy/sqlalchemy · Discussion #9328
February 22, 2023 - People have been asking on SO for some years: https://stackoverflow.com/questions/59291434/python-sqlalchemy-on-duplicate-key-update-with-multiple-records https://stackoverflow.com/questions/6611563/sqlalchemy-on-duplicate-key-update/48373874#48373874 · Thanks in advance for your expert guidance! Beta Was this translation helpful? Give feedback. ... Thanks for educating me here, it works! I also revised my example code to add setup and demonstrate upsert.
Author   sqlalchemy
🌐
Towards Data Science
towardsdatascience.com › home › latest › how to perform bulk insert/update/upsert actions with sqlalchemy orm
How to Perform Bulk Insert/Update/Upsert Actions with SQLAlchemy ORM | Towards Data Science
January 19, 2025 - Finally, let’s check how to perform bulk inserts. As SQLAlchemy does not yet have a backend-agnostic upsert construct, we need to implement it using dialect-specific constructs.
🌐
Medium
medium.com › data-science › how-to-perform-bulk-insert-update-upsert-actions-with-sqlalchemy-orm-79deef24f457
How to Perform Bulk Insert/Update/Upsert Actions with SQLAlchemy ORM | by Lynn G. Kwong | TDS Archive | Medium
May 14, 2024 - When this happens, performance is an important issue. If not handled properly, it will be a bottleneck of your application and reduce efficiency and usability. In this post, we will introduce how to perform bulk insert, update, and upsert actions for large numbers of records with SQLAlchemy ORM.
Find elsewhere
🌐
Reddit
reddit.com › r/flask › how to bulk update 400.000 database entry's with sqlalchemy
r/flask on Reddit: How to bulk update 400.000 database entry's with sqlalchemy
February 8, 2021 -

Hi ,

Right now I have a function where I upload a CSV file with over 400.000 data that contains the following data:

keyword | rank

What I try to do is a function where python will go through all 400k entry's and update data for those that exist in the database or upload if it does not exist using bulk_update_mappings or bulk_insert_mappings from sqlalchemy.

The problem is that right now process is very slow. This is how I do it:

csv_dicts = [{k: v for k, v in row.items()} for row in csv.DictReader(file_contents.splitlines(), skipinitialspace=True)]

#make a list with keywords present in the csv file
csv_keywords = [keyword["Name"] for keyword in csv_dicts]

#make a dictionary with keyword name and rank:
csv_rank_data = {keyword["Name"]: keyword["Rank"] for keyword in csv_dicts}

#get database keywords
db_keywords = Keyword.query.filter(Keyword.marketplace_id==marketplace_id).all()


#keywords to update. I generate a tupple that will give me id, name and rank for all keywords that i want to update (if present in csv file I upload)
 keyword_to_update = tuple([(keyword.name,keyword.id,csv_rank_data[keyword.name])
 for keyword in db_keywords if keyword.name in csv_keywords])

#create ditionary that will be used to bulk_update
update_data = [dict(id=keyword[1], rank = keyword[2] if keyword[2].isdigit() else 0) for keyword in keyword_to_update]
  db.session.bulk_update_mappings(Keyword, new_data)

This is the process I use right now to identify all existing keywords and get their ids to do bulk_update.

This takes a lot of time. How would you improve my code?

---------------------------------------------------------------

Solution!

After reading all your comments I decided to remove the search in the list where is possible or replace the list with a set

for example:

csv_keywords = [keyword["Name"] for keyword in csv_dicts]
#replace with:

csv_keywords = set([keyword["Name"] for keyword in csv_dicts])

In this case, because I was looking if an object exists or does not exist for 400.000 + it took a lot of time to search (full function needed 1.5-2 hr to complete or even more).

=================

Now it takes ~ 5-10 minutes. While this might not be as fast as I was looking for an insert or update it is still ok considering that I have to update 400k entry's for which i do not have the id

Top answer
1 of 5
8

First, I’d optimise the query. Right now you’re getting a lot of data and filter it in Python. Try to move that to the db side.

Then, you’re iterating over all the results twice, first you’re building db_keywords, then update_data. Another thing to think about it that if you’re bulk updating 400.000 entries they’ll all be pending until you commit, so I’d flush and commit every few thousand entries or such. All this can be done within a single for loop.

db_keywords = Keyword.query.filter_by(marketplace_id=marketplace_id).filter(Keyword.name.in_(csv_keywords)).all()

update_data = []
for i, keyword in enumerate(db_keywords):
    update_data.append({"id": keyword.id, "rank": keyword.rank})
    if not i % 2000:
        db.session.bulk_update_mappings(update_data)
        db.session.commit()
        update_data = []
db.session.commit()

If this is still too slow, you can omit the querying step completely by using SQLAlchemy core and building your update query manually. Here is the relevant documentation.

2 of 5
1

Solution!

After reading all your comments I decided to remove the search in the list where is possible or replace the list with a set. It seems that Python is much faster when it comes to checking if a string exists in a set

for example:

csv_keywords = [keyword["Name"] for keyword in csv_dicts] 
#replace with:  
csv_keywords = set([keyword["Name"] for keyword in csv_dicts])

In this case, because I was looking if an object exists or does not exist for 400.000 + it took a lot of time to search (full function needed 1.5-2 hr to complete or even more).

=================

Now it takes ~ 5-10 minutes to complete. While this might not be as fast as I was looking for an insert or update it is still ok considering that I have to update 400k entry's for which i do not have the id

Top answer
1 of 7
60

There is an upsert-esque operation in SQLAlchemy:

db.session.merge()

After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".

The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:

# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object

my_new_posts = {1: post1, 5: post5, 1000: post1000} 

for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
    # Only merge those posts which already exist in the database
    db.session.merge(my_new_posts.pop(each.id))

# Only add those posts which did not exist in the database 
db.session.add_all(my_new_posts.values())

# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()
2 of 7
53

You can leverage the on_conflict_do_update variant. A simple example would be the following:

from sqlalchemy.dialects.postgresql import insert

class Post(Base):
    """
    A simple class for demonstration
    """

    id = Column(Integer, primary_key=True)
    title = Column(Unicode)

# Prepare all the values that should be "upserted" to the DB
values = [
    {"id": 1, "title": "mytitle 1"},
    {"id": 2, "title": "mytitle 2"},
    {"id": 3, "title": "mytitle 3"},
    {"id": 4, "title": "mytitle 4"},
]

stmt = insert(Post).values(values)
stmt = stmt.on_conflict_do_update(
    # Let's use the constraint name which was visible in the original posts error msg
    constraint="post_pkey",

    # The columns that should be updated on conflict
    set_={
        "title": stmt.excluded.title
    }
)
session.execute(stmt)

See the Postgres docs for more details about ON CONFLICT DO UPDATE.

See the SQLAlchemy docs for more details about on_conflict_do_update.

Side-Note on duplicated column names

The above code uses the column names as dict keys both in the values list and the argument to set_. If the column-name is changed in the class-definition this needs to be changed everywhere or it will break. This can be avoided by accessing the column definitions, making the code a bit uglier, but more robust:

coldefs = Post.__table__.c

values = [
    {coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"},
    ...
]

stmt = stmt.on_conflict_do_update(
    ...
    set_={
        coldefs.title.name: stmt.excluded.title
        ...
    }
)
🌐
Medium
olegkhomenko.medium.com › how-to-create-an-sql-table-and-perform-upsert-using-sqlalchemy-orm-in-3-minutes-3f346ef7bdb5
How to create an SQL Table and perform UPSERT using SQLAlchemy ORM in 3 minutes | by Oleg Khomenko | Medium
April 27, 2023 - def upsert(users: dict, engine: sqlalchemy.engine.base.Engine, update=True): entries_to_update = 0 entries_to_put = [] with sessionmaker(bind=engine)() as sess: # Find all rows that needs to be updated and merge for each in ( sess.query(User.uuid) .filter(User.uuid.in_(users.keys())) .all() ): values = users.pop(each.uuid) entries_to_update += 1 if update: sess.merge(User(**values)) # Bulk mappings for everything that needs to be inserted for u in users.values(): entries_to_put.append(u) sess.bulk_insert_mappings(User, entries_to_put) sess.commit() return ( f" inserted:\t{len(entries_to_put)}\n" f" {'updated' if update else 'not updated'}:\t{str(entries_to_update)}" ) Nowe you can test it ·
🌐
GitHub
gist.github.com › amorgun › 2a04764f0fc80e646efeb79cd7ad0b70
SqlAlchemy postgres bulk upsert · GitHub
SqlAlchemy postgres bulk upsert. GitHub Gist: instantly share code, notes, and snippets.
🌐
SQLAlchemy
docs.sqlalchemy.org › en › 20 › orm › large_collections.html
Working with Large Collections — SQLAlchemy 2.0 Documentation
To bulk insert rows into a collection of this type using WriteOnlyCollection, the new records may be bulk-inserted separately first, retrieved using RETURNING, and those records then passed to the WriteOnlyCollection.add_all() method where the unit of work process will proceed to persist them ...
🌐
GitHub
github.com › sqlalchemy › sqlalchemy › discussions › 9702
Bulk upsert in SQLite dialect using INSERT…ON CONFLICT fails on Nullable column · sqlalchemy/sqlalchemy · Discussion #9702
Here is the revised SSCCE demonstrating bulk insert and bulk upsert in SQLite on a table that has nullable columns. import sqlalchemy as db import sqlalchemy.dialects.sqlite as sqlite from sqlalchemy import delete, select, String from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class Base(DeclarativeBase): pass class User(Base): __tablename__ = 'user' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(30), nullable=False) count: Mapped[int] = mapped_column(nullable=True) engine = db.create_engine('sqlite:///:memory:') conn = engine.connect() # setup step 0 - ensure the table exists Base().metadata.create_all(
Author   sqlalchemy
🌐
SQLAlchemy
docs.sqlalchemy.org › en › 21 › _modules › examples › performance › bulk_updates.html
examples.performance.bulk_updates — SQLAlchemy 2.1 Documentation
import Profiler Base = declarative_base() engine = None class Customer(Base): __tablename__ = "customer" id = Column(Integer, Identity(), primary_key=True) name = Column(String(255)) description = Column(String(255)) Profiler.init("bulk_updates", num=100000) @Profiler.setup def setup_database(dburl, echo, num): global engine engine = create_engine(dburl, echo=echo) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) s = Session(engine) for chunk in range(0, num, 10000): s.bulk_insert_mappings( Customer, [ { "name": "customer name %d" % i, "description": "customer description %d" %
🌐
SQLAlchemy
docs.sqlalchemy.org › en › 21 › orm › large_collections.html
Working with Large Collections — SQLAlchemy 2.1 Documentation
To bulk insert rows into a collection of this type using WriteOnlyCollection, the new records may be bulk-inserted separately first, retrieved using RETURNING, and those records then passed to the WriteOnlyCollection.add_all() method where the unit of work process will proceed to persist them ...
🌐
Tutorials Technology
tutorials.technology › home › sqlalchemy bulk insert 2026: 40x faster with core & orm 2.0 methods
SQLAlchemy Bulk Insert 2026: 40x Faster with Core & ORM 2.0 Methods | Tech Tutorials
2 weeks ago - Long-running bulk inserts can timeout. Commit in chunks to stay within transaction limits: from sqlalchemy import insert def insert_with_checkpoints(session, data, batch_size=1000): """Commit in chunks to avoid transaction timeouts (SQLAlchemy 2.0).""" for i in range(0, len(data), batch_size): batch = data[i:i + batch_size] session.execute(insert(User), batch) session.commit() # Commit each chunk print(f"Inserted {min(i + batch_size, len(data))}/{len(data)} rows")
🌐
Readthedocs
sqlalchemy-upsert-kit.readthedocs.io › en › latest › 01-Quick-Start-Guide › index.html
Quick Start Guide - sqlalchemy_upsert_kit 0.1.1 documentation
This guide demonstrates the three high-performance bulk upsert operations provided by sqlalchemy_upsert_kit using real examples with an in-memory SQLite database.
🌐
CSDN
devpress.csdn.net › python › 63044ffdc67703293080ad89.html
SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql_python_Mangs-Python
August 23, 2022 - How can I upsert with Flask-SQLAlchemy based on primary key alone? Is there a simple solution? If there is not, I can always check for and delete any record with a matching id, and then insert the new record, but that seems expensive for my situation, where I do not expect many updates. ... After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".
🌐
Narkive
sqlalchemy.narkive.com › CtCGE6eN › bulk-upsert-using-on-conflict-do-update
[sqlalchemy] Bulk upsert using on_conflict_do_update
In postgres there is apparently a special table called 'excluded' which contains all the rows to be inserted, so INSERT into MYTABLE(col1,col2,col3) VALUES (a,b,c),(d,e,f) ON CONFLICT (col1) DO UPDATE SET col2 = excluded.col2 col3 = excluded.col3 Does sqlalchemy support bulk upserts like this yet?
🌐
Towards Data Science
towardsdatascience.com › home › latest › how to perform bulk inserts with sqlalchemy efficiently in python
How to Perform Bulk Inserts With SQLAlchemy Efficiently in Python | Towards Data Science
March 5, 2025 - If you are an old school who only wants to work with plain SQL queries and don’t want to deal with Core API or ORM at all, you can use Connection.exec_driver_sql() to perform bulk inserts, which utilizes underlying DBAPI directly and has the same performance as using the Core APIs shown above: In this post, different SQLAlchemy methods are introduced for bulk inserts.