From version 1.2 SQLAlchemy will support on_duplicate_key_update for MySQL
There is also examples of how to use it:
from sqlalchemy.dialects.mysql import insert insert_stmt = insert(my_table).values( id='some_existing_id', data='inserted value') on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( data=insert_stmt.values.data, status='U' ) conn.execute(on_duplicate_key_stmt)
From version 1.1 SQLAlchemy support on_conflict_do_update for PostgreSQL
Examples:
Answer from vishes_shell on Stack Overflowfrom sqlalchemy.dialects.postgresql import insert insert_stmt = insert(my_table).values( id='some_existing_id', data='inserted value') do_update_stmt = insert_stmt.on_conflict_do_update( constraint='pk_my_table', set_=dict(data='updated value') ) conn.execute(do_update_stmt)
From version 1.2 SQLAlchemy will support on_duplicate_key_update for MySQL
There is also examples of how to use it:
from sqlalchemy.dialects.mysql import insert insert_stmt = insert(my_table).values( id='some_existing_id', data='inserted value') on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( data=insert_stmt.values.data, status='U' ) conn.execute(on_duplicate_key_stmt)
From version 1.1 SQLAlchemy support on_conflict_do_update for PostgreSQL
Examples:
from sqlalchemy.dialects.postgresql import insert insert_stmt = insert(my_table).values( id='some_existing_id', data='inserted value') do_update_stmt = insert_stmt.on_conflict_do_update( constraint='pk_my_table', set_=dict(data='updated value') ) conn.execute(do_update_stmt)
You can try this
def get_or_increase_tag(tag_name):
tag = session.query(Tag).filter_by(tag=tag_name).first()
if not tag:
tag = Tag(tag_name)
else:
tag.cnt += 1
return tag
You can check the link https://stackoverflow.com/search?q=Insert+on+duplicate+update+sqlalchemy
python - SQLAlchemy insert or update example - Stack Overflow
if 'id' exists then update, else insert into MySQL database using Pandas dataframe
python - SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql - Stack Overflow
python - sqlalchemy easy way to insert or update? - Stack Overflow
assuming certain column names...
INSERT one
newToner = Toner(toner_id = 1,
toner_color = 'blue',
toner_hex = '#0F85FF')
dbsession.add(newToner)
dbsession.commit()
INSERT multiple
newToner1 = Toner(toner_id = 1,
toner_color = 'blue',
toner_hex = '#0F85FF')
newToner2 = Toner(toner_id = 2,
toner_color = 'red',
toner_hex = '#F01731')
dbsession.add_all([newToner1, newToner2])
dbsession.commit()
UPDATE
q = dbsession.query(Toner)
q = q.filter(Toner.toner_id==1)
record = q.one()
record.toner_color = 'Azure Radiance'
dbsession.commit()
or using a fancy one-liner using MERGE
record = dbsession.merge(Toner( **kwargs))
I try lots of ways and finally try this:
def db_persist(func):
def persist(*args, **kwargs):
func(*args, **kwargs)
try:
session.commit()
logger.info("success calling db func: " + func.__name__)
return True
except SQLAlchemyError as e:
logger.error(e.args)
session.rollback()
return False
finally:
session.close()
return persist
and :
@db_persist
def insert_or_update(table_object):
return session.merge(table_object)
I'm having trouble with the updating part, I have figured out how to use sqlalchemy and pandas to do this df.to_sql('example_table', engine, if_exists='append') but this does as you would expect and appends the database, creating duplicates. This causes issues when i want to have unique keys and if the unique key is in the database already i want to just update the row with the new data. Is there a way to do this?
There is an upsert-esque operation in SQLAlchemy:
db.session.merge()
After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".
The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:
# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object
my_new_posts = {1: post1, 5: post5, 1000: post1000}
for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
# Only merge those posts which already exist in the database
db.session.merge(my_new_posts.pop(each.id))
# Only add those posts which did not exist in the database
db.session.add_all(my_new_posts.values())
# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()
You can leverage the on_conflict_do_update variant. A simple example would be the following:
from sqlalchemy.dialects.postgresql import insert
class Post(Base):
"""
A simple class for demonstration
"""
id = Column(Integer, primary_key=True)
title = Column(Unicode)
# Prepare all the values that should be "upserted" to the DB
values = [
{"id": 1, "title": "mytitle 1"},
{"id": 2, "title": "mytitle 2"},
{"id": 3, "title": "mytitle 3"},
{"id": 4, "title": "mytitle 4"},
]
stmt = insert(Post).values(values)
stmt = stmt.on_conflict_do_update(
# Let's use the constraint name which was visible in the original posts error msg
constraint="post_pkey",
# The columns that should be updated on conflict
set_={
"title": stmt.excluded.title
}
)
session.execute(stmt)
See the Postgres docs for more details about ON CONFLICT DO UPDATE.
See the SQLAlchemy docs for more details about on_conflict_do_update.
Side-Note on duplicated column names
The above code uses the column names as dict keys both in the values list and the argument to set_. If the column-name is changed in the class-definition this needs to be changed everywhere or it will break. This can be avoided by accessing the column definitions, making the code a bit uglier, but more robust:
coldefs = Post.__table__.c
values = [
{coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"},
...
]
stmt = stmt.on_conflict_do_update(
...
set_={
coldefs.title.name: stmt.excluded.title
...
}
)
... art = models.Articles(title = art_title, date = datum, description = words, url = adresse, status = 1) db.session.add(art) db.session.commit()
Hi there
My code adds my entries but duplicates evrything if i reload my page. I would like to have a condition IF NOT EXISTS.
Sounds very simple but cant find a solution :(
obi wan kenobi you're my only hope...
thanks
Unfortunately I am not sure what is performance of merge, but regarding to this post (which I highly recommend) the best option would be to use upsert and then just read from the table
from sqlalchemy.dialects.postgresql import insert
insert_stmt = insert(my_table).values(id='id',data='data')
do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['id'])
db.query(table).get("id")
It is possible to do it one call. It wasn't easy because insert.values(**items) doesn't allow a where clause combined with NOT EXISTS. We need a subquery but then we also need literals for every column and the keys for the insert.Also the literal for datetime caused a insert issue (formatting, maybe just postgres). I am wondering if there is a better way for the date issue.
In below example item_dict is a dictionary with field: value entries. It is only inserted if the invoice_number AND invoice_line_number don't already exist.
# Create literals for select. Datetime conversion was needed because insert failed without it due to formatting issues.
literals = [literal(val) if key != 'invoice_date' else literal(val.strftime("%Y-%m-%d")) for key,val in item_dict.items() ]
# NOT exists subquery providing the literals.
insert_select = select(literals).where(
~exists(literal(1)).where(
(performance_row_table.c.invoice_number == item_dict['invoice_number']) &
(performance_row_table.c.invoice_line_number == item_dict['invoice_line_number'])
)
)
keys = [key for key in item_dict.keys()]
# Insert using the subquery
insert_statement = performance_row_table.insert().from_select(keys, insert_select).returning(performance_row_table)
compiled = insert_statement.compile(compile_kwargs={"literal_binds": True})
print(compiled)
insert_result = session.execute(insert_statement).fetchone()