Spark danger: pivot is an action!
Introduction
When delving into crafting a new and efficient Spark job, or optimising an existing one, multiple implementation and design choices may have a significant impact on the job’s performance. One of the most prominent aspects influencing a Spark job’s efficiency is the fundamental difference between actions and transformations. Integrating a transformation into the job blueprint comes at a nominal cost, merely entailing the addition of another node within the execution plan. Of course this transformation will be computed later, once we trigger our data pipeline execution with an action. These actions commonly encompass activities such as persisting result files to storage, presenting a count of rows, or transmitting the entire DataFrame to the Spark Driver (in the form of a .collect() action). Spark actions impose a substantial computational burden due to their requirement for processing the entire graph of transformations. Hence, our team's primary focal point in optimising Apache Spark jobs rests upon identifying superfluous actions. A recent revelation that caught us off guard was the realisation that a seemingly typical transformation, .pivot(), may harbour an additional action.
Reshaping data with Spark pivot function
Since "pivoting" isn't a routine operation, let's take a brief two-minute interval to explain its function through a simple usage example.
We got data from a shipping company (see Code 1., Table 1.). Each row in DataFrame represents the number of packages sent to a particular country via different means of transport.
Now let’s assume that we want to calculate the total number of packages for each country and transport method in our DataFrame. In Spark we can compute it as follows (see Code 2., Table 2.):
Nonetheless, there are instances when we may prefer to pivot a single column. This transformation would transpose unique values from that particular column into new columns within an output DataFrame. Fortunately, Spark provides pivot for this purpose (see Code 3., Table 3.):
What happens when we perform a Spark pivot function? An insight
After running the Spark pivot function on DataFrame, we can analyse its execution using SparkUI. When it comes to assessing the execution, I suggest beginning with the SQL/DataFrame page. Here, you are able to view all the queries carried out by the Spark cluster. As a general guideline, one query typically aligns with one action in the source code. Nevertheless, as depicted in the image below (see Image 1.), there are four queries in total, despite having utilised the .show() action three times.
Surprisingly, there's an additional action within the function! This presents a potential issue because a portion of our job could be executed twice: initially by the action within pivot and subsequently by the show action.
Hidden action and how to remove it
While perusing the pySpark documentation for the function, the following passage comes to light:
There are two versions of the pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.
Initially, take note that this function has two parameters: `pivot_col` and an optional `values` parameter. Furthermore, a laconic explanation highlights that the invocation without `values` is comparatively less optimised. Upon a more in-depth exploration, the implementation details of the `pivot` function emerge. It becomes evident that this function includes a "collect" action responsible for computing the distinct values linked to the `pivot_col`. Subsequently, the second variant of the function, `pivot(pivot_col, values)`, is invoked.
To rectify our task, we can enhance our job by appending a secondary parameter to the `pivot` function (see Code 4.). Now this is a clear transformation with only one action (see Image 2.).
Nevertheless, on occasion, the inclusion of the `values` parameter might not be feasible due to our lack of awareness regarding them. While we deem this circumstance to be infrequent, we propose incorporating a `cache` operation immediately preceding the `pivot" function (see Code 5.). By doing so, the action within the `pivot` function will instantiate a cache, enabling subsequent actions to leverage the pre-computed outcomes of this specific segment within the job (see Image 3.).
Why is it a problem?
In our experience Spark jobs are often complex. They read data from multiple sources, filter and aggregate them in various ways. When we found this tiny difference between two versions of the pivot function we were optimising a Spark job that has hundreds of lines and multiple pivot calls. The job was struggling, but we were stumped, as we had already removed all unnecessary actions. Because of this hidden action inside the pivot, parts of our job were executed multiple times. The fact that we had multiple pivot calls spread across the job was not helping either. In our case we could easily add the values parameter, hence we easily gained a significant improvement in performance of the problematic Spark job.
Conclusion
Do not be a lazy data engineer. Add a second parameter to the pivot function.
BTW have I mentioned that our client’s job is now running 16 min instead of 85 min on a three times smaller cluster?
Happy Spark optimization!