Understanding common Performance Issues in Apache Spark - Deep Dive: Data Skew

Michael Heil
10 min readMay 26, 2021

--

In the introductory article Understanding common Performance Issues in Apache Spark we have defined Data Skew as

“Skewed data indicates an uneven data distribution across the partitions.”

Meaning of Data Skew

Photo by Justin Lynch on Unsplash

Skewness is a statistical measure to quantify the asymmetry of the value distribution about its mean value. The skewness value can be positive, zero, negative, or undefined.

  • Positive/Negative: The mass of the distribution is concentrated on the left/right of the mean value.
  • Zero: The mass of the distribution is equally distributed around the mean value, e.g. a normal distribution.
  • Undefined: The standard deviation is zero, e.g. a constant.

Spark provides the SQL built-in function skewness that calculates the skewness of a numeric Dataframe column. In the Spark code base you will find the applied formula. As this article is not meant to tackle mathematics in depth, we will look into skewness by example.

Calculating the Skewness requires numeric values because the skewness is a measure of weighted distance between an individual number and the mean value of the entire sample. It would not make sense to define a distance between, say, Strings such as the countries Germany and France.

In the following code snippet we prepare a Dataframe which is then used to calculate the skewness and later for our show case experiments.

The resulting Dataframe skewDf has three columns:

  • constDist: This column contains the constant value 1 for each record.
  • evenDist: This columns contains a random number with values between 0 and 99.
  • skewDist: This column contains the values 0 to 4 where the value 0 is present for around 80% (skewed!) of the overall records.

Applying the SQL function skewness on the three different columns leads to:

  • a undefined skewness for the constant column,
  • a (almost) zero skewness for an evenly distributed column, and
  • a positive skewness for the skewed column “skewDist” which indicates that the mass of the distribution is concentrated on the left of the mean value.
Dataframe with constant, even and skewed distributed columns

Even without calculating and checking the actual skewness values of the columns we can see from their distribution graphs if the data is skewed or not.

Apparently, the column “skewDist” is highly skewed. As a consequence, if we partition the Dataframe by that column we will get unevenly distributed data across the partitions. Remember that partitioning of the data happens implicitly when performing certain transformations such as sorts or joins.

Impact of skewed data on Performance

In order to understand the impact of unevenly distributed data across partitions we need to recall how a Spark application runs on a cluster:

  • In response to a Spark action (such as count() or save()) a job gets created.
  • Each job breaks down into one or more stages. The number of stages depend on the amount of shuffles that are required to perform the action and two stages are separated by a shuffle.
  • A stage represent a set of tasks that are executed together to compute the same operation on multiple executors.

When your data is skewed it means it is unevenly distributed across the partitions. Because a partition is the smallest data unit available in Spark, the task duration for processing that skewed data is also unbalanced. Overall,

  • a stage is only as fast as its slowest task, and
  • a subsequent stage can not start until the previous stage has finished.

This means you could end up having many tasks being done with their workload quickly but one task might take a lot of time. This slow task will delay the entire stage and consequently the entire application.

The illustration below depicts how a single task (Task 3) delays the entire stage (Stage 1) although the other two tasks in the same stage are finished way earlier. Even though an Executor might be free after finishing Tasks 1 or 2 to perform Task 4, it has to wait because the two stages are dependent on each other. The second stage can only start when all tasks within the first stage are concluded.

The entire application suffers from the slow Task 3 in the first Stage

Enough of theory. Let’s open a Zeppelin notebook and investigate some real examples to showcase the anticipated behavior.

Experiments showing impact of skewed data

Using the Dataframe skewDf as described above we analyze the performance impact due to skewed data in three experiments.

  • Experiment 1: Order
  • Experiment 2: Aggregation
  • Experiment 3: Join

We will see that the problems behind all three transformations are identical: There is an unevenly distributed data across the tasks putting disproportional workload to a single executor. This behavior can be observed in the screen shots of the corresponding Spark Web UI extract.

Result of Experiment 1: Order

The code is pretty simple, we just call orderBy and run an action to get the job started. We do this on the skewed and the evenly distributed columns for comparison purposes.

spark.conf.set("spark.sql.shuffle.partitions", 8)
skewDf.orderBy(col("skewDist")).foreach(_ => ())
skewDf.orderBy(col("evenDist")).foreach(_ => ())

The Event Timeline in the Spark Web UI reveal a very unhealthy distribution of computing times across the individual tasks when ordering the skewed data. Because the column “skewDist” contains only 5 distinct values, only 5 tasks are spawn to process the data. The evenly distributed data in column “evenDist” can be processed by all 8 shuffle partitions as we specified in the spark context configuration spark.sql.shuffle.partitions.

Task computing times for ordering column “skewDist”
Task computing times for ordering column “evenDist”

A close look at the task statistics show that ordering the data by the skewed column “skewDist” forced all records (80% out of 1 million) to be handled within the same task. This makes sense, as ordering has to be done with identical data being in the same partition. Imagine how easy your job can fail with an OutOfMemory Exception if you are dealing with huge data sets.

In comparison to the skewed data, the ordering of the column “evenDist” looks a lot healthier. The statistics on Shuffle Read Record emphasize that because each task is roughly dealing with the same amount of records leaving all tasks at a similar computation time.

When ordering skewed data the majority of the data is processed in a single task

Result of Experiment 2: Aggregation

This experiment is quite similar to the first one. The code snippet below shows what kind of aggregation we use for testing.

spark.conf.set("spark.sql.shuffle.partitions", 8)skewDf.repartition(5, col("skewDist"))
.groupBy(col("skewDist"))
.agg(max(col("evenDist")).as("maxEven"))
.foreach(_ => ())
skewDf.repartition(5, col("evenDist"))
.groupBy(col("evenDist"))
.agg(max(col("skewDist")).as("maxSkew"))
.foreach(_ => ())

The original Dataframe skewDf is re-partitioned into five partitions to simulate the scenario when reading partitioned data from a source such as a HDFS location or a Kafka topic. In those cases Spark will preserve the partitions from the source when reading the data.

As we have already seen in the first experiment, processing skewed data leads to a unbalanced computation times among the tasks.

Task computing times for grouping by column “skewDist”
Task computing times for grouping by column “evenDist”

In this experiment, a single task has to process even 85% of the data for the “group by” aggregation. The grouping of evenly distributed data takes place on five tasks with each task processing a similar portion of the total data.

When grouping by skewed data the majority of the data is processed in a single task

Looking at those numbers, I find it not difficult to imagine that dealing with skewed data can easily lead to OutOfMemory exceptions on an executor.

Result of Experiment 3: Join

Joining data on a skewed data such as the column “skewDist” in our Dataframe skewDf will also have Spark move all data of the same values to a single executor. A single executor can process multiple values but a single value of the join column is always handled in a single executor. (Spark 3 comes with the Adaptive Query Execution that is able to dynamically optimize skew joins.)

In our experiment we observe the typical behavior when dealing with skewed data. Joining on a skewed column shows that the majority of the stage’s computation time takes place in a single task. In comparison, joining on a evenly distributed column has the data and therefor the computation times more balanced across the tasks.

Task computing times for joining on skewed distributed column with AQE disabled
Task computing times for joining on evenly distributed column with AQE disabled

The statistics on the processed records per task emphasize the statement above. Around 85% of the data is processed in a single task when joining on skewed data.

When joining on skewed data the majority of the data is processed in a single task

Before we look for solutions how to deal with skewed data, we take a short detour on another important topic when talking about performance: Data Locality.

Side-Note: Meaning of Data Locality with regards to Performance

You may have noticed the column “Data Locality” in the Web UI. In Spark’s official tuning guide the Data Locality is defined as

“How close data is to the code processing it.”

The value in the column gives some insights into the locality of the data and the code processing that data. Usually, it is faster to serialize the code and wire it to the Executor that holds the data. There are five types of data locality in Spark and the two closest and therefore fastest are

  • PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible.
  • NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes.

You should aim to have those two showing up in your application. As I am running all experiments in local mode, it will only be one of the two.

Mitigation Strategies when dealing with skewed Data

First things first: I am talking about mitigation “strategies”, not about a concrete solution. My goal is to get across the potential issues that can happen when processing skewed data in a distributed computing system such as Spark. Also, be aware that my observations in the various experiments are specific to my data and to my environment. I am convinced that you will make other observations, especially when you are working with Spark in cluster mode, not as I do in local mode.

As time evolves and Spark’s optimization rules are getting more advanced your best bet is usually to rely on Spark itserlf to automatically handle skewed data. With Spark 3 there is the Adaptive Query Execution (AQE) framework that already deals with skewed data in joins in an efficient way. The concept (salting), however, can also be applied in previous Spark versions. And I find it always helpful to understand what is actually happening behind the scenes.

Salting

The following code depicts how to apply the salting concept to a column

// The higher the number the less data per partition. 
// However, the higher the number the more overhead
// we have when replicating the data in a join.
val levelOfSalt = 5
// Apply salt to existing key
val saltedSkewedDf = skewDf
.withColumn(“salt”, (lit(levelOfSalt) * rand()).cast(“int”))
.withColumn(“salted_skewed”, concat(col(“skewDist”), lit(“-”), col(“salt”)))
.drop(“salt”)

Instead of having the values 0 to 4, we now have values ranging from 0–0, 0–1, 0–2, 0–3, 0–4, 1–0, 1–1, …, 4–4.

Data is still skewed but with five times (= levelOfSalt) smaller chunks

This means, the skewed part (value 0) of the original Dataframe is split into five (= levelOfSalt) equally sized parts. The resulting Dataframe saltedSkewedDf is used for presenting the mitigation strategies for aggregations and join operations.

Mitigating skewed Aggregations

Ordering cannot be optimized using the salt concept. But let’s have a look how we can improve the unbalance of computation times when salting the skewed data before an aggregation (refer to Experiment 2):

This concept can be achieved with the following code:

saltedSkewedDf
// Our assumption was that the skewDf is
// pre-partitioned by column skewDist.
// Therefore, we need to have it
// repartitioned by the salted column.
.repartition(5, col(“salted_skewed”))
.groupBy(col(“salted_skewed”), col(“skewDist”))
.agg(max(col(“evenDist”)).as(“maxEven”))
.groupBy(col(“skewDist”))
.agg(max(col(“maxEven”)).as(“maxEven”))
.foreach(_ => ())

A look at the Spark Web UI tells us that there is no major chunk of the data being processed in a single executor and the computation times are quite balanced across all tasks. Yes, I know, it is still not perfectly balanced and that is exactly what I meant with “trade-offs”.

The skewness in the aggregation operations has been reduced by salting

Here, the trade-off is that we had to repartition the entire data on the salted&skewed column which results in a full shuffle of the data. The benefit is that this could be faster compared to processing the majority of the data in a single executor, especially when the aggregation logic is more complex. More importantly is the fact that we will not run into any OutOfMemory issues anymore.

Mitigating skewed Joins

For joins, the mitigation strategy is very similar:

  1. add a salt to the skewed column, and
  2. replicate the smaller Dataframe smallDf.

We have already seen how to add a salt to a column. The code below gives the details on how to replicate the smaller Dataframe and perform the join.

What happens here is depicted in the following illustration which was copied from a Databricks blog on Adaptive Query Execution: Speeding Up Spark SQL at Runtime.

The “AQE Skew Join” is much more intelligent in the way that is only adds a salt to the skewed partitions.

Overall, the trade-off was again to repartition the Dataframe which inevitably results in a full shuffle, however, the join itself can be done in a more balanced way across partitions and OutOfMemory exceptions are less likely to happen.

The skewness in the join operations has been reduced by salting

This concludes our analysis on dealing with skewed data in Spark. I hope you gained an understanding on what data skew means and how it can impact your Dataframe operations.

--

--

Michael Heil
Michael Heil

Written by Michael Heil

Apache Spark and Apache Kafka Enthusiast

No responses yet