The Problem: A 288M Record Sorting Job That Kept Failing
One of the data export job processes nearly 300 million e-commerce records daily, ranking them by 30-day sales (GMS) and 60-day clicks to export the top items. After months of stable operation, the job started failing consistently with a cryptic shuffle-related error.
The job would run for nearly 3 hours before crashing.
Understanding the Job Logic
Here’s the core Scala code that was causing the problem:
val sortedLimitedResult = transformedData .join(sideDf, Seq(IchibaYahooTransformData.Fields.SKU))
The sorting keys were:
ITEM_GMS_30D (30-day Gross Merchandise Sales)
ITEM_CLICK_60D (60-day click count)
SKU (as tie-breaker for sorting based on char order)
The scale: Starting with 288,689,929 total items, filtering down to 178,003,986 valid items, then globally sorting.
The Root Cause: Memory Fragmentation and Sorter Degradation
The logs reveal a fascinating progression showing how memory pressure caused Spark’s sorting mechanisms to degrade catastrophically. Let’s trace through the failure timeline:
Driver log:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
13-09-2025 10:58:50 JST feed_export INFO - 2025-09-13 10:58:50 JST INFO com.hadoop.compression.lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev bb4f4d562ec4888b1c6b0dec1ed7bc4b60229496] 13-09-2025 11:40:51 JST feed_export INFO - 2025-09-13 11:40:51 JST WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3200, hdp-dn6182.haas.jpw1a.dcnw.rakuten, executor 33): java.io.FileNotFoundException: /hadoop/ssd01/yarn/local/usercache/dsyncuser/appcache/application_1753406271096_1650221/blockmgr-2ca77cee-ee18-490a-a4fa-dfcdbc317b6d/3d/temp_shuffle_b93c7006-dcc8-43fd-b9b3-d3893b1e40b8 (Too many open files) 13-09-2025 11:40:51 JST feed_export INFO - at java.io.FileInputStream.open0(Native Method) 13-09-2025 11:40:51 JST feed_export INFO - at java.io.FileInputStream.open(FileInputStream.java:195) 13-09-2025 11:40:51 JST feed_export INFO - at java.io.FileInputStream.<init>(FileInputStream.java:138) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpillsWithTransferTo(UnsafeShuffleWriter.java:445) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:318) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:237) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.scheduler.Task.run(Task.scala:109) 13-09-2025 11:40:51 JST feed_export INFO - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 13-09-2025 11:40:51 JST feed_export INFO - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 13-09-2025 11:40:51 JST feed_export INFO - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 13-09-2025 11:40:51 JST feed_export INFO - at java.lang.Thread.run(Thread.java:748)
25/09/13 03:58:38 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 4.1 GB to disk (2 times so far) 25/09/13 04:01:04 INFO CodeGenerator: Code generated in 7.113155 ms 25/09/13 04:01:04 INFO CodeGenerator: Code generated in 4.209858 ms 25/09/13 04:01:04 INFO CodeGenerator: Code generated in 3.891692 ms 25/09/13 04:01:04 INFO CodeGenerator: Code generated in 5.282273 ms 25/09/13 04:01:04 INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter 25/09/13 04:01:38 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 4.1 GB to disk (0 time so far) 25/09/13 04:02:11 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 3.9 GB to disk (1 time so far) 25/09/13 04:02:44 INFO UnsafeExternalSorter: Thread 171 spilling sort data of 3.9 GB to disk (2 times so far) 25/09/13 04:03:11 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (0 time so far) 25/09/13 04:03:12 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (1 time so far) 25/09/13 04:03:12 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (2 times so far) ... 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86906 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86907 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86908 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86909 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86910 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86911 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86912 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86913 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86914 times so far) 25/09/13 04:55:59 INFO ShuffleExternalSorter: Thread 171 spilling sort data of 28.0 MB to disk (86915 times so far) 25/09/13 04:56:02 ERROR Executor: Exception in task 0.3 in stage 2.0 (TID 3203) java.io.FileNotFoundException: **/application_1753406271096_1650221/blockmgr-f4ac2ecf-3835-412e-96cb-ea91f2be1f0b/16/temp_shuffle_b75d51ec-dd50-4683-86e8-8bcc0a6a2aa6 (Too many open files) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpillsWithTransferTo(UnsafeShuffleWriter.java:445) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:318) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:237) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 25/09/13 04:56:03 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 25/09/13 04:56:03 INFO CoarseGrainedExecutorBackend: Driver from brakuda302z.prod.jp.local:41871 disconnected during shutdown 25/09/13 04:56:03 INFO CoarseGrainedExecutorBackend: Driver from brakuda302z.prod.jp.local:41871 disconnected during shutdown 25/09/13 04:56:03 INFO MemoryStore: MemoryStore cleared 25/09/13 04:56:03 INFO BlockManager: BlockManager stopped 25/09/13 04:56:03 INFO ShutdownHookManager: Shutdown hook called 25/09/13 04:56:03 INFO ShutdownHookManager: Deleting directory /hadoop/ssd01/yarn/local/usercache/dsyncuser/appcache/application_1753406271096_1650221/spark-fae09b02-6f5c-4af2-93fa-645e89456f8c
25/09/13 06:39:03 INFO UnsafeExternalSorter: Thread 196 spilling sort data of 8.1 GB to disk (0 time so far) 25/09/13 06:42:49 INFO CodeGenerator: Code generated in 5.560492 ms 25/09/13 06:42:49 INFO CodeGenerator: Code generated in 3.916098 ms 25/09/13 06:42:49 INFO CodeGenerator: Code generated in 3.289539 ms 25/09/13 06:42:49 INFO CodeGenerator: Code generated in 3.887022 ms 25/09/13 06:42:49 INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter 25/09/13 06:43:28 INFO UnsafeExternalSorter: Thread 196 spilling sort data of 8.1 GB to disk (0 time so far) 25/09/13 06:44:13 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (0 time so far) 25/09/13 06:44:34 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (1 time so far) 25/09/13 06:44:55 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (2 times so far) 25/09/13 06:45:13 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (3 times so far) 25/09/13 06:45:29 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (4 times so far) 25/09/13 06:45:46 INFO ShuffleExternalSorter: Thread 196 spilling sort data of 2.1 GB to disk (5 times so far) 25/09/13 06:46:01 INFO Executor: Finished task 0.0 in stage 1.1 (TID 3222). 3555 bytes result sent to driver 25/09/13 06:46:02 INFO CoarseGrainedExecutorBackend: Got assigned task 3236 25/09/13 06:46:02 INFO Executor: Running task 13.0 in stage 2.1 (TID 3236) 25/09/13 06:46:02 INFO MapOutputTrackerWorker: Updating epoch to 5 and clearing cache 25/09/13 06:46:02 INFO TorrentBroadcast: Started reading broadcast variable 7 25/09/13 06:46:02 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 9.3 KB, free 12.0 GB) 25/09/13 06:46:02 INFO TorrentBroadcast: Reading broadcast variable 7 took 73 ms 25/09/13 06:46:02 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 23.7 KB, free 12.0 GB) 25/09/13 06:46:02 INFO FileScanRDD: Reading File path: hdfs://***
Success Pattern Analysis
The optimized run shows a completely different behavior:
06:39:03: Started with even larger 8.1GB spills from UnsafeExternalSorter
06:44:13 - 06:45:46: Only 6 spills total of 2.1GB each from ShuffleExternalSorter
06:46:01: Clean task completion - no file handle exhaustion
Total spill volume: ~10.5GB vs 2.4TB in the failed run
The key insight: Off-heap memory allowed Spark to maintain larger in-memory buffers, preventing the catastrophic degradation to tiny spill files.
The problem was clear: The logs reveal a dramatic memory management breakdown. Initially, the job started with healthy 4.1GB spills from UnsafeExternalSorter, but quickly degraded into 86,915 tiny 28MB spills from ShuffleExternalSorter. This created nearly 2.4 TB of spill data (86,915 × 28MB) in small fragments that overwhelmed the file system’s ability to merge them, resulting in the fatal “Too many open files” error.
Visual Evidence: Spark UI Screenshots
Let’s look at the actual Spark UI evidence that helped us diagnose the problem:
Click to expand: Failed Job - Stages Overview
The stages view showing the failed execution with excessive shuffle operations
Click to expand: Stages Detail
Detailed performance metrics showing shuffle read/write statistics and spill behavior
Click to expand: SQL Execution Plan
The SQL execution plan showing the Window function that triggered the global sort
Why Window Functions Are Shuffle-Heavy
The Window.orderBy() operation requires a global sort across all data, which triggers Spark’s most expensive operation: a full shuffle. Here’s what happens:
Partition-local processing: Each executor processes its data partition
Global shuffle: All data must be redistributed to establish global ordering
Sort and merge: Executors sort their received data and merge results
Memory pressure: Large sorts require substantial memory buffers
Spill to disk: When memory is insufficient, data spills to temporary files
Final merge: All spill files must be merged for the final result
With heap memory constraints, step 5 becomes a bottleneck - creating too many small spill files that overwhelm the file system during merge.
And reduced executor memory from 8G to 4G to make room for the off-heap allocation.
Configuration Comparison
Before (Failed Configuration):
1 2 3
--executor-memory 8G --conf spark.executor.memoryOverhead=2048m # No off-heap memory configured
After (Successful Configuration):
1 2 3 4
--executor-memory 4G --conf spark.executor.memoryOverhead= # This need to be more than executor memory + off heap memory --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=10G
Total memory per executor: Actually increased from ~10GB (8G heap + 2G overhead) to ~16GB (4G heap + 2G overhead + 10G off-heap).
Results: Dramatic Performance Improvement
The results were striking:
Metric
Before (Failed)
After (Success)
Improvement
Spill Count
86,915 spills
6 spills
14,486x reduction
Spill Size per Operation
28MB per spill
2.1GB per spill
75x larger
Total Spill Volume
~2.4TB
~10.5GB
230x reduction
Memory Efficiency
Severe fragmentation
Healthy large buffers
Sustained performance
Job Status
File handle exhaustion
Clean completion
✅
Why Off-Heap Memory Solved the Problem
Off-heap memory provides several advantages for shuffle-intensive operations:
1. Larger Buffer Space
Off-heap memory isn’t subject to JVM garbage collection pressure
Allows Spark to buffer more data before spilling to disk
Reduces the frequency of spill operations
2. Reduced GC Pressure
Large data structures in heap memory trigger frequent GC pauses
Off-heap storage reduces GC overhead during intensive operations
More consistent performance during long-running sorts
Alternative Solutions (And Why They Didn’t Work)
Option 1: orderBy().limit()
We initially tried replacing the Window function with a simpler approach:
Why it failed: In Spark 2.3.2, this approach actually performs worse because it sorts with ALL the columns instead of the more optimized column pruning and join-back strategy.
Option 2: Memory Fraction Tuning
We considered adjusting spark.sql.shuffle.partitions or spark.memory.fraction.
Why we didn’t pursue it: The root cause was spill file fragmentation, not total memory availability. Off-heap memory directly addressed the underlying issue.
Off-heap memory shines for shuffle operations: Especially beneficial for global sorts and large window functions
File handle limits are real: Too many small spill files can overwhelm the OS file system
Sometimes less heap is better: Reducing executor memory to make room for off-heap can improve overall performance
When to Consider Off-Heap Memory
Off-heap memory is particularly effective for:
Large window functions requiring global sorts
High shuffle volume jobs with frequent spilling
Long-running batch jobs susceptible to GC pressure
Memory-intensive aggregations over large datasets
Conclusion
By enabling off-heap memory, we gave Spark the breathing room it needed to perform large sorts efficiently, reducing spill operations from 86,915 tiny chunks to just 6 large ones. Sometimes the best optimizations are the simplest ones.