Big Data 17 min read

How Flink’s Sort‑Shuffle Boosts Large‑Scale Batch Processing Performance

This article explains how Flink’s new Sort‑Shuffle mechanism improves stability and performance for massive batch jobs by reducing file counts, optimizing I/O, minimizing memory usage, and providing detailed implementation, test results, tuning tips, and future enhancements.

Alibaba Cloud Developer
Alibaba Cloud Developer
Alibaba Cloud Developer
How Flink’s Sort‑Shuffle Boosts Large‑Scale Batch Processing Performance

1. Data Shuffle Overview

Data shuffle is a crucial stage in batch jobs where upstream operators write output to external storage and downstream operators read it for further processing, also serving as a recovery point.

2. Why Introduce Sort‑Shuffle

The previous hash‑based shuffle generated many files, causing high file‑system pressure, random I/O, and memory‑buffer consumption that limited stability and performance. Sort‑Shuffle reduces the number of files, opens fewer files, maximizes sequential reads/writes, lowers I/O amplification, and decouples memory buffers from parallelism.

3. Flink Sort‑Shuffle Implementation

3.1 Design Goals

Reduce file count

Open fewer files

Maximize sequential I/O

Reduce I/O amplification

Lower memory‑buffer consumption

3.2 Implementation Details

3.2.1 In‑memory Data Sorting

During the sort‑spill phase, each record is serialized and written to a sort buffer. When the buffer fills, all binary data are sorted by partition order and then flushed to a file. The buffer interface is:

public interface SortBuffer {
   /** Appends data of the specified channel to this SortBuffer. */
   boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;
   BufferWithChannel copyIntoSegment(MemorySegment target);
   long numRecords();
   long numBytes();
   boolean hasRemaining();
   void finish();
   boolean isFinished();
   void release();
   boolean isReleased();
}

3.2.2 File Storage Structure

Each parallel task writes its shuffle data to a single physical file composed of multiple data regions. Within each region, records are ordered by partition number, enabling downstream tasks to read data sequentially per partition.

Shuffle file structure diagram
Shuffle file structure diagram

3.2.3 Index Files

For each data file there is an index file containing offsets and lengths for every partition in each data region, cached up to 4 MiB for fast lookup.

Index file structure diagram
Index file structure diagram

3.2.4 I/O Scheduling

Flink schedules reads using an elevator‑like algorithm that always serves the smallest file offset first, reducing random seeks.

for (data_region in data_regions) {
   data_reader = poll_reader_of_the_smallest_file_offset(data_readers);
   if (data_reader == null) break;
   reading_buffers = request_reading_buffers();
   if (reading_buffers.isEmpty()) break;
   read_data(data_region, data_reader, reading_buffers);
}

3.2.5 Broadcast Optimization

Broadcast data is stored only once in both the sort buffer and the shuffle file; all partition indexes point to the same block, greatly reducing broadcast overhead.

Broadcast optimization diagram
Broadcast optimization diagram

3.2.6 Data Compression

Compression is applied per network buffer per partition, improving TPC‑DS performance by over 30 %.

Compression flow diagram
Compression flow diagram

4. Test Results

4.1 Stability

Sort‑Shuffle eliminates file‑handle and inode exhaustion issues and resolves known Flink bugs (e.g., FLINK‑21201, FLINK‑19925), greatly improving batch job stability.

4.2 Performance

On a 1000‑parallelism cluster running TPC‑DS 10 TB, Sort‑Shuffle achieved 2‑6× overall speedup and up to 10× improvement for the shuffle phase alone.

Performance improvement chart
Performance improvement chart

5. Tuning Parameters

To enable Sort‑Shuffle set taskmanager.network.sort-shuffle.min-parallelism to a value lower than the expected number of partitions (e.g., 1 on HDD). Enable compression with taskmanager.network.blocking-shuffle.compression.enabled. Adjust buffer sizes via taskmanager.network.sort-shuffle.min-buffers to balance memory usage and throughput.

6. Future Work

Network connection reuse (FLINK‑22643, FLINK‑15455)

Multi‑disk load balancing (FLINK‑21790, FLINK‑21789)

Remote shuffle service

User‑selectable disk types (HDD vs SSD)

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Performance OptimizationFlinkBatch ProcessingData ShuffleSort-Shuffle
Alibaba Cloud Developer
Written by

Alibaba Cloud Developer

Alibaba's official tech channel, featuring all of its technology innovations.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.