from https://stackoverflow.com/a/26018934/465974
Answer from Shriram on Stack OverflowAfter I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".
The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:
Bulk upsert in Mysql dialect using on_duplicate_key_update
How to bulk update 400.000 database entry's with sqlalchemy
First, I’d optimise the query. Right now you’re getting a lot of data and filter it in Python. Try to move that to the db side.
Then, you’re iterating over all the results twice, first you’re building db_keywords, then update_data. Another thing to think about it that if you’re bulk updating 400.000 entries they’ll all be pending until you commit, so I’d flush and commit every few thousand entries or such. All this can be done within a single for loop.
db_keywords = Keyword.query.filter_by(marketplace_id=marketplace_id).filter(Keyword.name.in_(csv_keywords)).all()
update_data = []
for i, keyword in enumerate(db_keywords):
update_data.append({"id": keyword.id, "rank": keyword.rank})
if not i % 2000:
db.session.bulk_update_mappings(update_data)
db.session.commit()
update_data = []
db.session.commit()If this is still too slow, you can omit the querying step completely by using SQLAlchemy core and building your update query manually. Here is the relevant documentation.
More on reddit.compython - SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql - Stack Overflow
Bulk upsert in SQLite dialect using INSERT…ON CONFLICT fails on Nullable column
Videos
Hi ,
Right now I have a function where I upload a CSV file with over 400.000 data that contains the following data:
keyword | rank
What I try to do is a function where python will go through all 400k entry's and update data for those that exist in the database or upload if it does not exist using bulk_update_mappings or bulk_insert_mappings from sqlalchemy.
The problem is that right now process is very slow. This is how I do it:
csv_dicts = [{k: v for k, v in row.items()} for row in csv.DictReader(file_contents.splitlines(), skipinitialspace=True)]
#make a list with keywords present in the csv file
csv_keywords = [keyword["Name"] for keyword in csv_dicts]
#make a dictionary with keyword name and rank:
csv_rank_data = {keyword["Name"]: keyword["Rank"] for keyword in csv_dicts}
#get database keywords
db_keywords = Keyword.query.filter(Keyword.marketplace_id==marketplace_id).all()
#keywords to update. I generate a tupple that will give me id, name and rank for all keywords that i want to update (if present in csv file I upload)
keyword_to_update = tuple([(keyword.name,keyword.id,csv_rank_data[keyword.name])
for keyword in db_keywords if keyword.name in csv_keywords])
#create ditionary that will be used to bulk_update
update_data = [dict(id=keyword[1], rank = keyword[2] if keyword[2].isdigit() else 0) for keyword in keyword_to_update]
db.session.bulk_update_mappings(Keyword, new_data)
This is the process I use right now to identify all existing keywords and get their ids to do bulk_update.
This takes a lot of time. How would you improve my code?
---------------------------------------------------------------
Solution!
After reading all your comments I decided to remove the search in the list where is possible or replace the list with a set
for example:
csv_keywords = [keyword["Name"] for keyword in csv_dicts] #replace with: csv_keywords = set([keyword["Name"] for keyword in csv_dicts])
In this case, because I was looking if an object exists or does not exist for 400.000 + it took a lot of time to search (full function needed 1.5-2 hr to complete or even more).
=================
Now it takes ~ 5-10 minutes. While this might not be as fast as I was looking for an insert or update it is still ok considering that I have to update 400k entry's for which i do not have the id
First, I’d optimise the query. Right now you’re getting a lot of data and filter it in Python. Try to move that to the db side.
Then, you’re iterating over all the results twice, first you’re building db_keywords, then update_data. Another thing to think about it that if you’re bulk updating 400.000 entries they’ll all be pending until you commit, so I’d flush and commit every few thousand entries or such. All this can be done within a single for loop.
db_keywords = Keyword.query.filter_by(marketplace_id=marketplace_id).filter(Keyword.name.in_(csv_keywords)).all()
update_data = []
for i, keyword in enumerate(db_keywords):
update_data.append({"id": keyword.id, "rank": keyword.rank})
if not i % 2000:
db.session.bulk_update_mappings(update_data)
db.session.commit()
update_data = []
db.session.commit()
If this is still too slow, you can omit the querying step completely by using SQLAlchemy core and building your update query manually. Here is the relevant documentation.
Solution!
After reading all your comments I decided to remove the search in the list where is possible or replace the list with a set. It seems that Python is much faster when it comes to checking if a string exists in a set
for example:
csv_keywords = [keyword["Name"] for keyword in csv_dicts]
#replace with:
csv_keywords = set([keyword["Name"] for keyword in csv_dicts])
In this case, because I was looking if an object exists or does not exist for 400.000 + it took a lot of time to search (full function needed 1.5-2 hr to complete or even more).
=================
Now it takes ~ 5-10 minutes to complete. While this might not be as fast as I was looking for an insert or update it is still ok considering that I have to update 400k entry's for which i do not have the id
There is an upsert-esque operation in SQLAlchemy:
db.session.merge()
After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".
The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:
# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object
my_new_posts = {1: post1, 5: post5, 1000: post1000}
for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
# Only merge those posts which already exist in the database
db.session.merge(my_new_posts.pop(each.id))
# Only add those posts which did not exist in the database
db.session.add_all(my_new_posts.values())
# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()
You can leverage the on_conflict_do_update variant. A simple example would be the following:
from sqlalchemy.dialects.postgresql import insert
class Post(Base):
"""
A simple class for demonstration
"""
id = Column(Integer, primary_key=True)
title = Column(Unicode)
# Prepare all the values that should be "upserted" to the DB
values = [
{"id": 1, "title": "mytitle 1"},
{"id": 2, "title": "mytitle 2"},
{"id": 3, "title": "mytitle 3"},
{"id": 4, "title": "mytitle 4"},
]
stmt = insert(Post).values(values)
stmt = stmt.on_conflict_do_update(
# Let's use the constraint name which was visible in the original posts error msg
constraint="post_pkey",
# The columns that should be updated on conflict
set_={
"title": stmt.excluded.title
}
)
session.execute(stmt)
See the Postgres docs for more details about ON CONFLICT DO UPDATE.
See the SQLAlchemy docs for more details about on_conflict_do_update.
Side-Note on duplicated column names
The above code uses the column names as dict keys both in the values list and the argument to set_. If the column-name is changed in the class-definition this needs to be changed everywhere or it will break. This can be avoided by accessing the column definitions, making the code a bit uglier, but more robust:
coldefs = Post.__table__.c
values = [
{coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"},
...
]
stmt = stmt.on_conflict_do_update(
...
set_={
coldefs.title.name: stmt.excluded.title
...
}
)