If you are using the latest or older spark you can use df.unpersist() to achieve the same but the older versions have a bug which is fixed in the latest version(2.3.2) of spark where its not updating the storage memory stats it works but its not updating the stats so i request you to run it on the latest spark to see the stats difference
Refer the below link to know more about this
unpersist() issue
ReleaseNote for 2.3.2
Please do approve the answer if useful.
Answer from Sundeep on Stack OverflowDrop spark dataframe from cache - Stack Overflow
python - How to release a dataframe in spark? - Stack Overflow
python - How to immediately release memory after removing columns in PySpark? - Stack Overflow
python - Un-persisting all dataframes in (py)spark - Stack Overflow
No, del thisRDD is not enough, it would just delete the pointer to the RDD. You should call thisRDD.unpersist() to remove the cached data.
For you information, Spark uses a model of lazy computations, which means that when you run this code:
>>> thisRDD = sc.parallelize(xrange(10),2).cache()
you won't have any data cached really, it would be only marked as 'to be cached' in the RDD execution plan. You can check it this way:
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
| ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]
But when you call an action on top of this RDD at least once, it would become cached:
>>> thisRDD.count()
10
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 174.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]
You can easily check the persisted data and the level of persistence in the Spark UI using the address http://<driver_node>:4040/storage. You would see there that del thisRDD won't change the persistence of this RDD, but thisRDD.unpersist() would unpersist it, while you still would be able to use thisRDD in your code (while it won't persist in memory anymore and would be recomputed each time it is queried)
Short answer: The following code should do the trick:
import gc
del thisRDD
gc.collect()
Explanation:
Even if you are using PySpark, your RDD's data is managed on the Java side, so first let's ask the same question, but for Java instead of Python:
If I'm using Java, and I simply release all references to my RDD, is that sufficient to automatically unpersist it?
For Java, the answer is YES, the RDD will be automatically unpersisted when it is garbage collected, according to this answer. (Apparently that functionality was added to Spark in this PR.)
OK, what happens in Python? If I remove all references to my RDD in Python, does that cause them to be removed on the Java side?
PySpark uses Py4J to send objects from Python to Java and vice-versa. According to the Py4J Memory Model Docs:
Once the object is garbage collected on the Python VM (reference count == 0), the reference is removed on the Java VM
But take note: Removing the Python references to your RDD won't cause it to be immediately deleted. You have to wait for the Python garbage collector to clean up the references. You can read the Py4J explanation for details, where they recommend the following:
A call to
gc.collect()also usually works.
OK, now back to your original question:
Would the following be enough to get this done:
del thisRDD
Almost. You should remove the last reference to it (i.e. del thisRDD), and then, if you really need the RDD to be unpersisted immediately**, call gc.collect().
**Well, technically, this will immediately delete the reference on the Java side, but there will be a slight delay until Java's garbage collector actually executes the RDD's finalizer and thereby unpersists the data.
just do the following:
df1.unpersist()
df2.unpersist()
Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.
If the dataframe registered as a table for SQL operations, like
df.createGlobalTempView(tableName) // or some other way as per spark verision
then the cache can be dropped with following commands, off-course spark also does it automatically
Spark >= 2.x
Here spark is an object of SparkSession
Drop a specific table/df from cache
spark.catalog.uncacheTable(tableName)Drop all tables/dfs from cache
spark.catalog.clearCache()
Spark <= 1.6.x
Drop a specific table/df from cache
sqlContext.uncacheTable(tableName)Drop all tables/dfs from cache
sqlContext.clearCache()
Spark 2.x
You can use Catalog.clearCache:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()
Spark 1.x
You can use SQLContext.clearCache method which
Removes all cached tables from the in-memory cache.
from pyspark.sql import SQLContext
from pyspark import SparkContext
sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
We use this quite often
for (id, rdd) in sc._jsc.getPersistentRDDs().items():
rdd.unpersist()
print("Unpersisted {} rdd".format(id))
where sc is a sparkContext variable.
There are 2 scenarios that I feel cause memory leaks that I struggle to know how to avoid.
Scenario 1:
There is a need to perform multiple edits to a df like below:
df = method1() df = method2(df) df = method3(df)
If I'm not mistaken, this approach is discouraged because each df is lengthening the memory footprint. How do you get around this?
Scenario 2:
There is a need to execute looping in pyspark. For example, Let's say I have 400 files I need to execute a transformation on and I loop through 10 at a time --> read in 10 files, transform data, write back out to file...loop again. This feels like it is also causing a memory leak.
Should we be persisting data in both scenarios? How do we prevent memory build up? Is there a way to refresh/kill the spark context but maintain the looping so force release any memory usage?