If you are using the latest or older spark you can use df.unpersist() to achieve the same but the older versions have a bug which is fixed in the latest version(2.3.2) of spark where its not updating the storage memory stats it works but its not updating the stats so i request you to run it on the latest spark to see the stats difference

Refer the below link to know more about this

unpersist() issue

ReleaseNote for 2.3.2

Please do approve the answer if useful.

Answer from Sundeep on Stack Overflow
🌐
DataCamp
campus.datacamp.com › courses › cleaning-data-with-pyspark › improving-performance
Removing a DataFrame from cache | Spark
You've finished the analysis tasks with the departures_df DataFrame, but have some other processing to do. You'd like to remove the DataFrame from the cache to prevent any excess memory usage on your cluster.
Discussions

Drop spark dataframe from cache - Stack Overflow
I am using Spark 1.3.0 with python api. While transforming huge dataframes, I cache many DFs for faster execution; df1.cache() df2.cache() Once use of certain dataframe is over and is no longer ne... More on stackoverflow.com
🌐 stackoverflow.com
python - How to release a dataframe in spark? - Stack Overflow
I am using spark do some calculation. every 5 minutes, I got a new data frame. I put it into a dict called dict_1_hour like this dict_1_hour[timestamp] = dataframe New data frame comes in to the ... More on stackoverflow.com
🌐 stackoverflow.com
May 22, 2017
python - How to immediately release memory after removing columns in PySpark? - Stack Overflow
I want to remove specific columns from a DataFrame in PySpark and immediately release the memory occupied by these columns to optimize resource usage due to limited RAM. What is the best way to ach... More on stackoverflow.com
🌐 stackoverflow.com
[PySpark] Releasing memory after a spark job is finished ...
Email display mode: · Modern rendering · Legacy rendering · This site requires JavaScript enabled. Please enable it More on lists.apache.org
🌐 lists.apache.org
Top answer
1 of 4
16

No, del thisRDD is not enough, it would just delete the pointer to the RDD. You should call thisRDD.unpersist() to remove the cached data.

For you information, Spark uses a model of lazy computations, which means that when you run this code:

>>> thisRDD = sc.parallelize(xrange(10),2).cache()

you won't have any data cached really, it would be only marked as 'to be cached' in the RDD execution plan. You can check it this way:

>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]

But when you call an action on top of this RDD at least once, it would become cached:

>>> thisRDD.count()
10
>>> print thisRDD.toDebugString()
(2) PythonRDD[6] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]
 |       CachedPartitions: 2; MemorySize: 174.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:364 [Memory Serialized 1x Replicated]

You can easily check the persisted data and the level of persistence in the Spark UI using the address http://<driver_node>:4040/storage. You would see there that del thisRDD won't change the persistence of this RDD, but thisRDD.unpersist() would unpersist it, while you still would be able to use thisRDD in your code (while it won't persist in memory anymore and would be recomputed each time it is queried)

2 of 4
15

Short answer: The following code should do the trick:

import gc
del thisRDD
gc.collect()

Explanation:

Even if you are using PySpark, your RDD's data is managed on the Java side, so first let's ask the same question, but for Java instead of Python:

If I'm using Java, and I simply release all references to my RDD, is that sufficient to automatically unpersist it?

For Java, the answer is YES, the RDD will be automatically unpersisted when it is garbage collected, according to this answer. (Apparently that functionality was added to Spark in this PR.)

OK, what happens in Python? If I remove all references to my RDD in Python, does that cause them to be removed on the Java side?

PySpark uses Py4J to send objects from Python to Java and vice-versa. According to the Py4J Memory Model Docs:

Once the object is garbage collected on the Python VM (reference count == 0), the reference is removed on the Java VM

But take note: Removing the Python references to your RDD won't cause it to be immediately deleted. You have to wait for the Python garbage collector to clean up the references. You can read the Py4J explanation for details, where they recommend the following:

A call to gc.collect() also usually works.

OK, now back to your original question:

Would the following be enough to get this done:

del thisRDD

Almost. You should remove the last reference to it (i.e. del thisRDD), and then, if you really need the RDD to be unpersisted immediately**, call gc.collect().

**Well, technically, this will immediately delete the reference on the Java side, but there will be a slight delay until Java's garbage collector actually executes the RDD's finalizer and thereby unpersists the data.

🌐
Apache
spark.apache.org › docs › latest › api › python › reference › pyspark.sql › api › pyspark.sql.DataFrame.unpersist.html
pyspark.sql.DataFrame.unpersist — PySpark 4.1.1 documentation
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk. New in version 1.3.0. Changed in version 3.4.0: Supports Spark Connect. ... Whether to block until all blocks are deleted.
🌐
Quora
quora.com › How-do-I-delete-a-data-frame-in-Spark
How to delete a data frame in Spark - Quora
So we can't delete it. But we can remove a dataframe from memory if is pesisted or cached. We can remove a PySpark DataFrame by calling the unpersist() method
🌐
Databricks Community
community.databricks.com › t5 › data-engineering › unable-to-clear-cache-using-a-pyspark-session › td-p › 8568
Solved: Unable to clear cache using a pyspark session - Databricks Community - 8568
April 2, 2023 - The unpersist() method is used to remove the data from memory after it is no longer needed. ... It's possible that you are using the wrong Spark context to access the cached RDD. If you cache an RDD using the SparkContext object, you need to use the same object to retrieve the cached RDD later.
Find elsewhere
🌐
Databricks
community.databricks.com › t5 › get-started-discussions › unpersist-doesn-t-clear › td-p › 50570
unpersist doesn't clear - Databricks Community - 50570
December 1, 2023 - from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf from pyspark.storagelevel import StorageLevel spark = SparkSession.builder.appName('TEST').config('spark.ui.port','4098').enableHiveSupport().getOrCreate() df4 = spark.sql(' \ select * from hive_schema.table_name limit 1') print("query completed " ) df4.unpersist() df4.count() ... I have execute above code to clear the dataframe release the memory.
🌐
Spark By {Examples}
sparkbyexamples.com › home › apache hadoop › spark drop dataframe from cache
Spark Drop DataFrame from Cache - Spark By {Examples}
March 27, 2024 - You can also manually remove DataFrame from the cache using unpersist() method in Spark/PySpark. unpersist() marks the DataFrame as non-persistent, and removes all blocks for it from memory and disk. unpersist(Boolean) with argument blocks until ...
🌐
Saturn Cloud
saturncloud.io › blog › managing-memory-in-pyspark-how-to-drop-dataframes-effectively
Managing Memory in PySpark: How to Drop DataFrames Effectively | Saturn Cloud Blog
August 25, 2023 - In the code above, del df deletes the reference to the DataFrame, and gc.collect() calls the garbage collector to free up the memory. Another way to manage memory in PySpark is by unpersisting DataFrames.
🌐
Stack Overflow
stackoverflow.com › questions › 79380222 › how-to-immediately-release-memory-after-removing-columns-in-pyspark
python - How to immediately release memory after removing columns in PySpark? - Stack Overflow
I want to remove specific columns from a DataFrame in PySpark and immediately release the memory occupied by these columns to optimize resource usage due to limited RAM. What is the best way to ach...
🌐
Orchestra
getorchestra.io › guides › spark-concepts-pyspark-sql-catalog-clearcache-simplified
Spark Concepts: pyspark.sql.Catalog.clearCache Simplified | Orchestra
September 9, 2024 - It provides a simple and efficient way to release memory and ensure the smooth operation of your Spark application. Here's the basic syntax of how to use pyspark.sql.Catalog.clearCache: from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("ClearCacheExample").getOrCreate() # Perform some operations and cache data # ... # Clear the cache spark.catalog.clearCache() In this example, we first create a SparkSession, perform some operations on dataframes, and then clear the cache using spark.catalog.clearCache().
🌐
Medium
medium.com › illumination › managing-memory-and-disk-resources-in-pyspark-with-cache-and-persist-aa08026929e2
Managing Memory and Disk Resources in PySpark with Cache and Persist | by Ahmed Uz Zaman | ILLUMINATION | Medium
February 21, 2023 - This means that the data will be stored in memory and can be quickly accessed during the reduce operation, which should improve performance. Note that caching a DataFrame can be especially useful if you plan to reuse it multiple times in your PySpark application.
🌐
Apache
lists.apache.org › thread › o3btpsyhy1tv9qkbhxhp2trrpbj0m0vw
[PySpark] Releasing memory after a spark job is finished ...
Email display mode: · Modern rendering · Legacy rendering · This site requires JavaScript enabled. Please enable it
🌐
Saturn Cloud
saturncloud.io › blog › how-to-release-memory-used-by-a-pandas-dataframe
How to Release Memory Used by a Pandas DataFrame | Saturn Cloud Blog
December 7, 2023 - One simple way to release memory used by a Pandas DataFrame is to use the del statement to delete the DataFrame object.
🌐
Reddit
reddit.com › r/apachespark › how do you avoid memory leaks in spark/pyspark for multiple dataframe edits and loops?
r/apachespark on Reddit: How do you avoid memory leaks in Spark/Pyspark for multiple dataframe edits and loops?
March 8, 2023 -

There are 2 scenarios that I feel cause memory leaks that I struggle to know how to avoid.

Scenario 1:

There is a need to perform multiple edits to a df like below:

df = method1() df = method2(df) df = method3(df) 

If I'm not mistaken, this approach is discouraged because each df is lengthening the memory footprint. How do you get around this?

Scenario 2:

There is a need to execute looping in pyspark. For example, Let's say I have 400 files I need to execute a transformation on and I loop through 10 at a time --> read in 10 files, transform data, write back out to file...loop again. This feels like it is also causing a memory leak.

Should we be persisting data in both scenarios? How do we prevent memory build up? Is there a way to refresh/kill the spark context but maintain the looping so force release any memory usage?