Solving Spark Shuffling Issue: How 10 GB Off-Heap Memory Saved Our Sorting Job

The Problem: A 288M Record Sorting Job That Kept Failing

One of the data export job processes nearly 300 million e-commerce records daily, ranking them by 30-day sales (GMS) and 60-day clicks to export the top items. After months of stable operation, the job started failing consistently with a cryptic shuffle-related error.

The job would run for nearly 3 hours before crashing.

Understanding the Job Logic

Here’s the core Scala code that was causing the problem:

1
2
3
4
5
6
7
8
val sideDf = transformedData
.select(sortingKeys.head, sortingKeys.tail: _*)
.withColumn(IchibaYahooExportData.Intermediate.SORTING_ROW_NUMBER,
row_number().over(Window.orderBy(sortingKeys.map(col(_).desc): _*)))
.filter(col(IchibaYahooExportData.Intermediate.SORTING_ROW_NUMBER) <= settings.feed.maxOutputCnt)

val sortedLimitedResult = transformedData
.join(sideDf, Seq(IchibaYahooTransformData.Fields.SKU))

The sorting keys were:

  • ITEM_GMS_30D (30-day Gross Merchandise Sales)
  • ITEM_CLICK_60D (60-day click count)
  • SKU (as tie-breaker for sorting based on char order)

The scale: Starting with 288,689,929 total items, filtering down to 178,003,986 valid items, then globally sorting.

The Root Cause: Memory Fragmentation and Sorter Degradation

The logs reveal a fascinating progression showing how memory pressure caused Spark’s sorting mechanisms to degrade catastrophically. Let’s trace through the failure timeline:

Driver log:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
13-09-2025 10:58:50 JST feed_export INFO - 2025-09-13 10:58:50 JST INFO com.hadoop.compression.lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev bb4f4d562ec4888b1c6b0dec1ed7bc4b60229496]
13-09-2025 11:40:51 JST feed_export INFO - 2025-09-13 11:40:51 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3200, hdp-dn6182.haas.jpw1a.dcnw.rakuten, executor 33): java.io.FileNotFoundException: /hadoop/ssd01/yarn/local/usercache/dsyncuser/appcache/application_1753406271096_1650221/blockmgr-2ca77cee-ee18-490a-a4fa-dfcdbc317b6d/3d/temp_shuffle_b93c7006-dcc8-43fd-b9b3-d3893b1e40b8 (Too many open files)
13-09-2025 11:40:51 JST feed_export INFO - at java.io.FileInputStream.open0(Native Method)
13-09-2025 11:40:51 JST feed_export INFO - at java.io.FileInputStream.open(FileInputStream.java:195)
13-09-2025 11:40:51 JST feed_export INFO - at java.io.FileInputStream.<init>(FileInputStream.java:138)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpillsWithTransferTo(UnsafeShuffleWriter.java:445)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:318)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:237)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.scheduler.Task.run(Task.scala:109)
13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
13-09-2025 11:40:51 JST feed_export INFO - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
13-09-2025 11:40:51 JST feed_export INFO - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
13-09-2025 11:40:51 JST feed_export INFO - at java.lang.Thread.run(Thread.java:748)

Executor log:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
25/09/13 03:58:38 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 4.1 GB to disk (2  times so far)
25/09/13 04:01:04 INFO CodeGenerator: Code generated in 7.113155 ms
25/09/13 04:01:04 INFO CodeGenerator: Code generated in 4.209858 ms
25/09/13 04:01:04 INFO CodeGenerator: Code generated in 3.891692 ms
25/09/13 04:01:04 INFO CodeGenerator: Code generated in 5.282273 ms
25/09/13 04:01:04 INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
25/09/13 04:01:38 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 4.1 GB to disk (0 time so far)
25/09/13 04:02:11 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 3.9 GB to disk (1 time so far)
25/09/13 04:02:44 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 3.9 GB to disk (2 times so far)
25/09/13 04:03:11 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (0 time so far)
25/09/13 04:03:12 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (1 time so far)
25/09/13 04:03:12 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (2 times so far)
...
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86906 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86907 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86908 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86909 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86910 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86911 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86912 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86913 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86914 times so far)
25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86915 times so far)
25/09/13 04:56:02 ERROR Executor: Exception in task 0.3 in stage 2.0 (TID 3203)
java.io.FileNotFoundException: **/application_1753406271096_1650221/blockmgr-f4ac2ecf-3835-412e-96cb-ea91f2be1f0b/16/temp_shuffle_b75d51ec-dd50-4683-86e8-8bcc0a6a2aa6 (Too many open files)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpillsWithTransferTo(UnsafeShuffleWriter.java:445)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:318)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:237)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
25/09/13 04:56:03 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
25/09/13 04:56:03 INFO CoarseGrainedExecutorBackend: Driver from brakuda302z.prod.jp.local:41871 disconnected during shutdown
25/09/13 04:56:03 INFO CoarseGrainedExecutorBackend: Driver from brakuda302z.prod.jp.local:41871 disconnected during shutdown
25/09/13 04:56:03 INFO MemoryStore: MemoryStore cleared
25/09/13 04:56:03 INFO BlockManager: BlockManager stopped
25/09/13 04:56:03 INFO ShutdownHookManager: Shutdown hook called
25/09/13 04:56:03 INFO ShutdownHookManager: Deleting directory /hadoop/ssd01/yarn/local/usercache/dsyncuser/appcache/application_1753406271096_1650221/spark-fae09b02-6f5c-4af2-93fa-645e89456f8c

Analyzing the Failure Pattern

Notice the critical transition in the failed run:

  1. 03:58:38 - 04:02:44: Healthy 4.1GB spills from UnsafeExternalSorter (normal large-scale sorting)
  2. 04:03:11: Sudden switch to ShuffleExternalSorter with tiny 28MB spills
  3. 04:03:11 - 04:55:59: Nearly 53 minutes of continuous tiny spills, creating 86,915+ small files
  4. 04:56:02: Fatal “Too many open files” error during merge phase

Compare this to the successful run after optimization:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
25/09/13 06:39:03 INFO UnsafeExternalSorter: Thread 196 spilling sort data of 8.1 GB to disk (0  time so far)
25/09/13 06:42:49 INFO CodeGenerator: Code generated in 5.560492 ms
25/09/13 06:42:49 INFO CodeGenerator: Code generated in 3.916098 ms
25/09/13 06:42:49 INFO CodeGenerator: Code generated in 3.289539 ms
25/09/13 06:42:49 INFO CodeGenerator: Code generated in 3.887022 ms
25/09/13 06:42:49 INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
25/09/13 06:43:28 INFO UnsafeExternalSorter: Thread 196 spilling sort data of 8.1 GB to disk (0 time so far)
25/09/13 06:44:13 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (0 time so far)
25/09/13 06:44:34 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (1 time so far)
25/09/13 06:44:55 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (2 times so far)
25/09/13 06:45:13 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (3 times so far)
25/09/13 06:45:29 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (4 times so far)
25/09/13 06:45:46 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (5 times so far)
25/09/13 06:46:01 INFO Executor: Finished task 0.0 in stage 1.1 (TID 3222). 3555 bytes result sent to driver
25/09/13 06:46:02 INFO CoarseGrainedExecutorBackend: Got assigned task 3236
25/09/13 06:46:02 INFO Executor: Running task 13.0 in stage 2.1 (TID 3236)
25/09/13 06:46:02 INFO MapOutputTrackerWorker: Updating epoch to 5 and clearing cache
25/09/13 06:46:02 INFO TorrentBroadcast: Started reading broadcast variable 7
25/09/13 06:46:02 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 9.3 KB, free 12.0 GB)
25/09/13 06:46:02 INFO TorrentBroadcast: Reading broadcast variable 7 took 73 ms
25/09/13 06:46:02 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 23.7 KB, free 12.0 GB)
25/09/13 06:46:02 INFO FileScanRDD: Reading File path: hdfs://***

Success Pattern Analysis

The optimized run shows a completely different behavior:

  1. 06:39:03: Started with even larger 8.1GB spills from UnsafeExternalSorter
  2. 06:44:13 - 06:45:46: Only 6 spills total of 2.1GB each from ShuffleExternalSorter
  3. 06:46:01: Clean task completion - no file handle exhaustion
  4. Total spill volume: ~10.5GB vs 2.4TB in the failed run

The key insight: Off-heap memory allowed Spark to maintain larger in-memory buffers, preventing the catastrophic degradation to tiny spill files.

The problem was clear: The logs reveal a dramatic memory management breakdown. Initially, the job started with healthy 4.1GB spills from UnsafeExternalSorter, but quickly degraded into 86,915 tiny 28MB spills from ShuffleExternalSorter. This created nearly 2.4 TB of spill data (86,915 × 28MB) in small fragments that overwhelmed the file system’s ability to merge them, resulting in the fatal “Too many open files” error.

Visual Evidence: Spark UI Screenshots

Let’s look at the actual Spark UI evidence that helped us diagnose the problem:

Click to expand: Failed Job - Stages Overview

The stages view showing the failed execution with excessive shuffle operations

Click to expand: Stages Detail

Detailed performance metrics showing shuffle read/write statistics and spill behavior

Click to expand: SQL Execution Plan

The SQL execution plan showing the Window function that triggered the global sort

Why Window Functions Are Shuffle-Heavy

The Window.orderBy() operation requires a global sort across all data, which triggers Spark’s most expensive operation: a full shuffle. Here’s what happens:

  1. Partition-local processing: Each executor processes its data partition
  2. Global shuffle: All data must be redistributed to establish global ordering
  3. Sort and merge: Executors sort their received data and merge results
  4. Memory pressure: Large sorts require substantial memory buffers
  5. Spill to disk: When memory is insufficient, data spills to temporary files
  6. Final merge: All spill files must be merged for the final result

With heap memory constraints, step 5 becomes a bottleneck - creating too many small spill files that overwhelm the file system during merge.

The Solution: Off-Heap Memory Configuration

We added two simple configuration parameters:

1
2
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=10G

And reduced executor memory from 8G to 4G to make room for the off-heap allocation.

Configuration Comparison

Before (Failed Configuration):

1
2
3
--executor-memory 8G
--conf spark.executor.memoryOverhead=2048m
# No off-heap memory configured

After (Successful Configuration):

1
2
3
4
--executor-memory 4G
--conf spark.executor.memoryOverhead= # This need to be more than executor memory + off heap memory
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=10G

Total memory per executor: Actually increased from ~10GB (8G heap + 2G overhead) to ~16GB (4G heap + 2G overhead + 10G off-heap).

Results: Dramatic Performance Improvement

The results were striking:

Metric Before (Failed) After (Success) Improvement
Spill Count 86,915 spills 6 spills 14,486x reduction
Spill Size per Operation 28MB per spill 2.1GB per spill 75x larger
Total Spill Volume ~2.4TB ~10.5GB 230x reduction
Memory Efficiency Severe fragmentation Healthy large buffers Sustained performance
Job Status File handle exhaustion Clean completion

Why Off-Heap Memory Solved the Problem

Off-heap memory provides several advantages for shuffle-intensive operations:

1. Larger Buffer Space

  • Off-heap memory isn’t subject to JVM garbage collection pressure
  • Allows Spark to buffer more data before spilling to disk
  • Reduces the frequency of spill operations

2. Reduced GC Pressure

  • Large data structures in heap memory trigger frequent GC pauses
  • Off-heap storage reduces GC overhead during intensive operations
  • More consistent performance during long-running sorts

Alternative Solutions (And Why They Didn’t Work)

Option 1: orderBy().limit()

We initially tried replacing the Window function with a simpler approach:

1
df.orderBy(sortingKeys.map(col(_).desc): _*).limit(maxOutputCnt)

Why it failed: In Spark 2.3.2, this approach actually performs worse because it sorts with ALL the columns instead of the more optimized column pruning and join-back strategy.

Option 2: Memory Fraction Tuning

We considered adjusting spark.sql.shuffle.partitions or spark.memory.fraction.

Why we didn’t pursue it: The root cause was spill file fragmentation, not total memory availability. Off-heap memory directly addressed the underlying issue.

Key Takeaways

  1. Monitor spill patterns: Small, frequent spills (20-50MB) often indicate memory fragmentation issues
  2. Off-heap memory shines for shuffle operations: Especially beneficial for global sorts and large window functions
  3. File handle limits are real: Too many small spill files can overwhelm the OS file system
  4. Sometimes less heap is better: Reducing executor memory to make room for off-heap can improve overall performance

When to Consider Off-Heap Memory

Off-heap memory is particularly effective for:

  • Large window functions requiring global sorts
  • High shuffle volume jobs with frequent spilling
  • Long-running batch jobs susceptible to GC pressure
  • Memory-intensive aggregations over large datasets

Conclusion

By enabling off-heap memory, we gave Spark the breathing room it needed to perform large sorts efficiently, reducing spill operations from 86,915 tiny chunks to just 6 large ones. Sometimes the best optimizations are the simplest ones.