Dataframes in Apache Spark are immutable. SO you cannot change it, to delete rows from data frame you can filter the row that you do not want and save in another dataframe.
Answer from koiralo on Stack Overflowpyspark - How to delete rows efficiently in sparksql? - Stack Overflow
sql server - Delete records from table before writing dataframe - pyspark - Stack Overflow
apache spark - Remove rows from dataframe based on condition in pyspark - Stack Overflow
pyspark - Delete rows based on condition from SQL Server using Databricks Notebook - Stack Overflow
What is py-spark-sql.com?
Is py-spark-sql.com free?
Do I need to install SQL to use this platform?
Videos
Dataframes in Apache Spark are immutable. SO you cannot change it, to delete rows from data frame you can filter the row that you do not want and save in another dataframe.
You can not delete rows from Data Frame. But you can create new Data Frame which exclude unwanted records.
sql = """
Select a.* FROM adsquare a
INNER JOIN codepoint c ON a.grid_id = c.grid_explode
WHERE dis2 <= 1 """
sq.sql(sql)
In this way you can create new data frame. Here I used reverse condition dis2 <= 1
Instead of deleting the data in sql server table before writing your dataframe, you can directly write your dataframe with .mode("overwrite") and .option("truncate",true).
https://learn.microsoft.com/en-us/sql/big-data-cluster/spark-mssql-connector?view=sql-server-ver15
Spark documentations says that dbtable is used for passing table that should be read from or written into. FROM clause can be use only while reading data with JDBC connector. (resource: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)
My suggestion would be either to use overwrite writing mode or to open a separate connection for data deletion. Spark is not required for data deletion and connection to MySQL server. It will be enough to use Python MySQL connector or to open a separate jdbc connection.
I think the best way would be to simply use "filter".
df_filtered=df.filter(df.col1>df.col2)
df_filtered.show()
+--------+----+
| col1|col2|
+--------+----+
|22 |12.2|
|77 |33.3|
Another possible way could be using a where function of DF.
For example this:
val output = df.where("col1>col2")
will give you the expected result:
+----+----+
|col1|col2|
+----+----+
| 22|12.2|
| 77|33.3|
+----+----+
You can load the dataframe and filter it:
import pyspark.sql.functions as f
df = spark.sql("SELECT * from users_by_email")
df_filtered = df.filter(f.col("email_address") == "[email protected]")
Then you can save the dataframe with the overwrite option or, also, in a new table.
Spark does not allow update and Delete Query over dataframe. You need to use Python external API in the code for deletion.
You can check below Python API which provide .delete() function for delete.
https://docs.datastax.com/en/developer/python-driver/3.18/api/cassandra/cqlengine/models/#cassandra.cqlengine.models.Model-methods
Using Spark SQl functions
dt_path = "/any/path/"
my_dt = DeltaTable.forPath(spark, dt_path)
seq_keys = ["id1", "id2", "id3"]
my_dt.delete(col("key_col_name").isin(seq_keys))
And in scala:
val dt_path = "/any/path/"
val my_dt : DeltaTable = DeltaTable.forPath(spark, dt_path)
val seq_keys = Seq("id1", "id2", "id3")
my_dt.delete(col("key_col_name").isin(seq_keys:_*))
Let's say you have two dataframes, one being your data and the other one just a column with the IDs of rows to delete. A left-anti-JOIN can help you filter out the rows you want to delete.
df = df.join(dfWithIdsToDelete, "<idColumnName>", "left_anti")
This JOIN gives you all the rows of the df where the ID does not exist in the dfWithIdsToDelete, therefore filtering out all the rows you want to delete.
If your list of IDs to delete is a python list, you can just convert it to a dataframe.
Take a look on this code:
from pyspark.sql import SQLContext
def main_function():
sql = SQLContext(sc)
tests = sql.read.format("org.apache.spark.sql.cassandra").\
load(keyspace="your keyspace", table="test").where(...)
for test in tests:
delete_sql = "delete from test_event where id = " + test.select('id')
sql.execute(delete_sql)
Be aware of deleting one row at a time is not a best practice on spark but the above code is just an example to help you figure out your implementation.
Spark Cassandra Connector (SCC) itself provides only Dataframe API for Python. But there is a pyspark-cassandra package that provides RDD API on top of the SCC, so deletion could be performed as following.
Start pyspark shell with (I've tried with Spark 2.4.3):
bin/pyspark --conf spark.cassandra.connection.host=IPs\
--packages anguenot:pyspark-cassandra:2.4.0
and inside read data from one table, and do delete. You need to have source data to have the columns corresponding to the primary key. It could be full primary key, partial primary key, or only partition key - depending on it, Cassandra will use corresponding tombstone type (row/range/partition tombstone).
In my example, table has primary key consisting of one column - that's why I specified only one element in the array:
rdd = sc.cassandraTable("test", "m1")
rdd.deleteFromCassandra("test","m1", keyColumns = ["id"])