Understanding common Performance Issues in Apache Spark - Deep Dive: Data Spill

Michael Heil
12 min readMay 8, 2021

--

In the introductory article Understanding common Performance Issues in Apache Spark we have defined Data Spill as

Spill refers to the step of moving data from in-memory to disk and vice versa.

Spark spills data when a given partition is too large to fit into the RAM of the Executor. A spill is sometimes Spark’s last resort to avoid an OutOfMemory Exception. However, this comes at the cost of potentially expensive disk reads and writes.

Executor Memory Layout

In order to understand when data spill happens we need to have a clear picture on the Executor’s memory layout. The amount of memory available to each Executor is controlled by the configuration spark.executor.memory. As the Executor process runs on a Java virtual machine Spark understands the maximum available memory by java.lang.Runtime.getRuntime.maxMemory().

The return value of maxMemory() sets the limit of available memory in Spark’s internal Memory Manager class and is the reference to quantify the amount of memory available for execution and storage purposes within the application. The relevant code within the Memory Manager boils down to

// Return the total amount of memory shared between 
// execution and storage, in Bytes.
def getMaxMemory: Long = {
val systemMemory = java.lang.Runtime.getRuntime.maxMemory()

// Fixed amount of memory for non-storage, non-execution purposes.
val reservedMemory = 300 * 1024 * 1024
val usableMemory = systemMemory — reservedMemory
(usableMemory * memoryFraction).toLong
}

The value for “reservedMemory” is hard coded to 300MB. The value of the variable “memoryFraction” is configured by spark.memory.fraction which defaults to 0.6. Another relevant configuration is spark.memory.storageFraction which defaults to 0.5. Both configurations are described in the Memory Management section within the Spark Configuration documentation:

  • spark.memory.fraction: “Fraction of (heap space — 300MB) used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur.”
  • spark.memory.storageFraction: “Amount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory may be available to execution and tasks may spill to disk more often.”

To summarize, the following diagram depicts the four parts of the Executor’s memory layout.

Executor Memory Layout

Spark can acquire storage memory if not required for execution purposes, and vice versa.

How Data Spill happens?

There are multiple scenarios when data spill occurs. Some of them are obvious, but we will see that some other scenarios are a bit hidden and difficult to anticipate. In this deep-dive we will look into the following four scenarios:

  • Experiment 1: Data as-is exceeds Storage Memory limits
  • Experiment 2: Apply explode() of an array before a wide transformation
  • Experiment 3: Perform a join of two tables
  • Experiment 4: Aggregating results by a skewed feature

How to identify Data Spill

The information of a data spill is visible in the Spark Web UI within the tab “Stages”. There are two different values presented which are described in the documentation on Stage Detail:

  • Shuffle spill (memory) is the size of the deserialized form of the shuffled data in memory.
  • Shuffle spill (disk) is the size of the serialized form of the data on disk.

Both values are always presented as a pair. If no spill is happening those columns/rows in the Spark UI will not even show up. The picture below highlights the occurrences of the data spills:

Tab “Stage” in Spark Web UI showing data spill

You will also see the amount of memory data spill in the Query Details under the SQL tab, as shown in the screenshot below. Here, the disk data spill might not be visible.

Another, programmatic way, to get hold of the spill information is by extending the Spark Listener class and add it to the Spark Context. The Spark Listener keeps track on the internals within the application and is actually being utilized to populate the Spark Web UI.

The following code is based on the Test Utils within the Apache Spark code base.

SpillListener

Overview of Test Data

Before we investigate the above listed experiments on data spill, let us understand what data we are dealing with throughout this article.

In this article we will work with data from the U.S Centers for Disease Control and Prevention. To be more precise, we will process the data set COVID-19 Case Surveillance Public Use Data with Geography Data Profile.

Here are some graphs to get an initial feeling of the data. According to the pie and bar chart the feature “age_group” seems to be skewed which makes it a perfect candidate for some of our experiments.

Statistics on Covid19 Data Set

Below are some more statistics about the data set which was downloaded at the end of April 2021.

  • File format: csv
  • Number of Rows: 22.507.140 (including header)
  • Number of Columns: 19
  • Size in file system: ~3.2GB
  • Size in Spark memory: ~421MB

Note the difference of data size in file system compared to Spark memory. This is caused by Spark’s storage format (“Vectorized Parquet”) and its default compression (“lz4”).

Experiment 1: Data as-is exceeds Storage Memory limits

Through this experiment we can observe the obvious scenario where the raw data is too large to fit into the available memory when trying to cache the data. This is indeed not surprising, and frankly speaking, I would not even call it a data spill in its pure sense. However, this experiment helps to validate the numbers of the Executor’s Memory Layout we have seen above.

Experiment 1 Theory

Within Spark’s memory, our data set is of size 421MB. If we cap the Executor Memory to 1GB and keep the default setting on spark.memory.fraction to 0.6 we will run out of storage memory when trying to cache the data. Let’s do the math:

// Setting Executor Memory to 1GB
spark.executor.memory = 1g
// Maximum Memory available to application within JVM
println(java.lang.Runtime.getRuntime().maxMemory())
954728448
// Calculating Execution and Storage Memory
0.6 * (954728448 Bytes - 300MB) = 384093388 Bytes = 366MB

Apparently, 366MB is less than 421M, therefore, I expect to see some data be stored on disk. Note that we “only” have 366MB for execution and storage purposes for our entire application available!

Experiment 1 Code

The code is pretty simple. Read the data and cache it. Calling count() is required as an action to have Spark actually cache the data in its memory.

Experiment 1 Results

Let’s check the Web UI if our expectation to observe some data spill proves true. Indeed, the numbers shown in the Web UI validate our theory:

Overall, the data size is 357MB + 64MB = 421MB. In addition, we have 357MB + 9MB = 366MB Storage Memory (see column “On Heap Memory Usage”) available which aligns with our own calculation.

The original csv-file was scanned in partitions of size 128MB (default value of the config spark.sql.files.maxPartitionBytes) and those 128MB get serialized and compressed into blocks of 18MB. The disk usage of 64MBis based on 4 out of 24 blocks (3 * 18MB + 1 * 10MB = 64MB) . Be aware, the 10MB block is the “remaining” partition when the original csv-file gets divided into 128MB-sized blocks.

I would like to emphasize, that a partition is either fully within memory or on disk. Because a partition is the smallest data unit within Spark it does not get separated further.

Experiment 2: Apply explode() of an array before a wide transformation

This experiment demonstrates a more hidden cause of data spilling which might not be obvious when writing the code.

Experiment 2 Theory

We use the same data as above. In this experiment we will add a column with an Array just containing two elements. The next step is to explode that array which will increase the number of rows by the factor of two (length of Array).

Experiment 2 Code

The code for this experiment has three actions after reading the csv-file.

  • Repartition the data with the RoundRobinPartitioner to show that the available memory is sufficient for not having any data spill.
  • After adding a column with a two-element Array and explode it apply again a repartition into four partitions.
  • After adding a column with a two-element Array and explode it apply a wide transformation.

Note the line spark.sparkContext.setJobDescription(“Any Description”). When dealing with a lot of entries in the Spark Web UI this becomes handy identifying the jobs and stages your are interested in.

Experiment 2 Result

Both experiment variants on the exploded Array causes data spill. I find it very interesting because the Array has only two elements. Apparently this is sufficient to cause the spill. Of course, I have selected the memory size and the data set to exactly show this behavior with a two-element Array. However, to me, the explosion of a small Array does not feel like a memory/storage-intensive transformation at a first glance.

The variants have two stages each. The first is writing the shuffle files of the 24 partitions whereas the second is (A) reducing it to four partitions on a round-robin basis or (B) selecting only the distinct rows with parallelism (shuffle partitions) set to 4.

The pictures below show the data spill for variants A and B.

Experiment 2A: explodedDf.repartition(4)
Experiment 2B: explodedDf.distinct()

Note that in both cases the memory spill is identical across all tasks. In Experiment 2A we have 640MB for 23 tasks (the additional one with 320MB is again the “remainder”, smaller partition after reading the file) and in Experiment 2B we have 701MB for all four tasks. This is due to the nature of the transformation such as repartition and distinct. They both result in uniformly sized partitions. In the Experiments 3 and 4 we will observe a different distributed memory spill across the tasks due to the skewness of the transformations.

The value in the column “Spill (Memory)” under the section “Aggregated Metrics by Executor” is the sum of the values of the same column for all Tasks within the section “Tasks”.

Experiment 3: Perform a join of two tables

Joins can get really messy on distributed data. Depending on the locality of the data and its belonging to a certain partition, if things go bad (and I promise, they will!), you could end up shuffling a lot of data.

Experiment 3 Theory

In this experiment we join our data with another Dataframe which has only 10 rows. The notable part in this join is the join expression which refers to a skewed column “age_group”. The skewness is visible in the pie chart in a previous section.

Experiment 3 Code

As usual, we read the same Covid-19 data set as before. In addition, we extract 10 rows and keep it in another Dataframe. Interestingly, the selected 10 rows all have NA in the column “age_group”.

These two Dataframes are joined together. As the smaller Dataframe only contains 10 rows it will definitely falls under the limit of the configuration spark.sql.autoBroadcastJoinThreshold which default to 10MB. As I would rather like to showcase a Shuffle Sort Merge Join, we can have Spark avoid a broadcast join by specifically selecting the appropriate join hint. The shuffle partitions are set to 6.

Experiment 3 Result

The distribution of the memory spill mirrors the distribution of the six possible values in the column “age_group”. In fact, Spark used only 3 tasks to perform the join, the remaining 3 tasks have not processed any data at all. Looking at the record numbers in the Task column “Shuffle Read Size / Records”, we can discover how Spark has put the data into the different Tasks:

  • 0-17 years: 2,559,165 row processed in Task 136
  • 18 to 49 years: 11,911,270 rows processed in Task 135
  • 50 to 64 years: 4,525,363 rows processed in Task 137
  • 65+ years: 3,109,385 rows processed in Task 135
  • Missing: 159,186 rows processed in Task 136
  • NA: 242,770 rows processed in Task 136

The following screen shot of the relevant Stage shows the details.

Experiment 3

Interestingly, it does not even matter that the smaller Dataframe with 10 rows contain solely the records with value NA as age group. However, it has to be mentioned that I have disabled the Adaptive Query Execution (AQE) available in Spark 3.x which is able to automatically deal with skewed data joins.

The take away from this experiment is that a data spill can occur even when joining a small Dataframe that cannot be broadcasted. Spark’s Shuffle Sort Merge Join requires a full shuffle of the data and if the data is skewed it can suffer from data spill.

Experiment 4: Aggregating results by a skewed feature

This experiment is similar to the previous experiment as we utilize the skewness of the data in column “age_group” to force our application into a data spill. However, this time the transformation is even “simpler” than a join. The goal is to observe a data spill when sorting the data.

Experiment 4 Theory

By now, we know that the column “age_group” is skewed. If we apply a wide transformation (like a sort or aggregation) over that column we force the application to have all rows with the same age group being processed in the same task.

Experiment 4 Code

The code for this experiment is straight forward. We read the csv-file and apply the orderBy transformation.

Experiment 4 Result

The result is almost identical to what we have seen in Experiment 3. The main difference is, that the relevant Stage has now used five instead of three tasks to sort the data based on the age group.

Experiment 4

Again, the numbers in the column “Shuffle Read Size / Records” reveal which categories have been processed by which task. Some of the categories are small enough to comfortably fit into a single partition without any spill.

How to Mitigate Data Spill

Photo by Ohmky on Unsplash

As mentioned in the introductory article, dealing with performance issues in Spark takes a lot of investigation and time. There is no single silver bullet solving all problems with just enabling a certain configuration or applying a specific piece of code. Of course, adding more memory to the cluster nodes is an intuitive solution. But at what cost? Is it worth scaling your cluster and therefore increasing your costs just to reduce the data spill in some individual tasks? On the other hand, a single task taking a lot of time (due to spilling) will delay the entire stage.

Generally speaking, to mitigate Data Spill your goal should be to have less data on a single partition. This can be achieved by

  • increasing the amount of partitions through properly adjusting the configuration spark.sql.shuffle.partitions,
  • modify the partitions of your data by calling repartition(), or
  • if the data is read from a file, keep the value of the configuration spark.sql.files.maxPartitionBytes low.

All of the above tricks will often not help if your data is skewed. If your data is skewed, you need to tackle that problem first and as a benefit the spill issue will resolve itself.

In the table below we can see that increasing the memory actually helps in all cases and the biggest performance gain was achieved in Experiment 2B where the spilled task took 29 seconds with 1GB memory and only 15 seconds with 12GB. Also, the other Experiments benefits from more memory as it avoids any spill.

Keeping the same memory of 1GB while increasing the shuffle partitions to the default value to 200 does not have any impact at all on the Experiment 2A. This is expected as we are particularly calling to repartition the data to four partitions. However, the distinct transformation in Experiment 2B can make use of all 200 shuffle partitions to avoid any data spill and reduce the time significantly from 29 seconds to 17 seconds.

In Experiment 3 and 4 we are dealing with skewed data. As mentioned above, the increase of shuffle partitions does not have any impact at all because the skew causes a single partition to anyway contain all data to perform the aggregation.

Run times of tasks that spilled data with initial set-up of 1g memory and 4/6 shuffle partitions

Because Data Skew has such an importance for the performance of an Spark application, we will dedicate the subsequent deep dive on that topic.

Stay tuned!

--

--