How to Merge Small Files in Flink Checkpoints to Reduce HDFS Load
This article explains a small‑file‑merging technique for Apache Flink checkpoints that reuses FSDataOutputStreams to combine multiple state files into a single HDFS file, detailing design considerations such as concurrent checkpoint support, reference‑counted deletion, space amplification reduction, fault handling, compatibility, and observed production performance gains.
Background
Flink uses a checkpoint mechanism for fault tolerance, writing state snapshots to a distributed file system (typically HDFS) and then handing file handles to the JobManager. When many jobs run concurrently, the large number of checkpoint files creates heavy RPC traffic and puts pressure on the NameNode’s memory.
Both full checkpoints (writing all state to a single file) and incremental checkpoints (writing each SST file separately) suffer from this problem, especially under high job concurrency.
Previous attempts, such as using ByteStreamStateHandle, sent small states directly to the JobManager to avoid creating many small files, but this approach either left many small files (if the threshold was too low) or risked OOM on the JobManager (if the threshold was too high).
Small‑File Merging Approach
The proposed solution reuses a single FSDataOutputStream for multiple SST files until a predefined size threshold is reached. Consequently, many state handles share one physical file on HDFS, dramatically reducing the number of files created.
Design and Implementation
1) Concurrent Checkpoint Support
Flink already supports concurrent checkpoints, so the merging logic must ensure that writes from different checkpoints do not corrupt the shared file. The design writes each checkpoint’s state to a separate HDFS file (no cross‑checkpoint sharing) to avoid interleaving writes.
2) Preventing Accidental Deletion
Reference counting tracks how many state handles use a shared file. Deleting a file only when its reference count drops to zero can be unsafe because a zero count does not guarantee the file is no longer needed. The solution defers deletion until the entire checkpoint finishes and verifies that no later checkpoint still references the file.
3) Reducing Space Amplification
After merging, each SST file occupies a segment within the shared HDFS file. The whole file can only be deleted when **all** segments are unused, which can waste space (observed amplification factor 1.3–1.6). An asynchronous compression thread monitors files whose amplification exceeds a threshold and rewrites them into new, compact files.
The compression workflow:
Calculate amplification ratio for each file.
If the ratio is low, skip to step 7.
If the ratio exceeds the threshold, create a new file A' (cleanup on failure).
Record mapping between A and A'.
When the next checkpoint sends a state handle that falls within A, use the mapping to generate a new handle pointing to A'.
After the checkpoint completes, increase A' reference count, decrease A’s count, and delete A when its count reaches zero.
Compression completes.
4) Exception Handling
Two failure scenarios are considered:
JobManager (JM) failures: JM only stores state‑handle metadata and reference counts, which are not persisted. On JM failover, the system can recover from the latest completed checkpoint and rebuild reference counts.
TaskManager (TM) failures: If a file was already reported to JM, JM controls its deletion based on reference counts. If a file has never been reported, it becomes an orphan file; external tools are responsible for cleaning such files.
5) Canceling TM Snapshots
When a checkpoint times out or fails, the TM‑side snapshot must be cancelled. Flink currently lacks a direct notification mechanism; ongoing work (FLINK‑8871) adds RPC messages from JM to TM to trigger snapshot cancellation.
Compatibility
The small‑file merging feature is backward compatible. The only change is in the second step of checkpoint restore (downloading state data). Existing jobs continue to work without modification.
Advantages and Limitations
Advantages: Significant reduction of HDFS pressure, including fewer RPC calls and lower NameNode memory usage.
Limitations: The current implementation does not support multi‑threaded state uploads, though this is not a bottleneck for checkpoints today.
Production Results
After deployment, NameNode RPC traffic and response times dropped noticeably, easing the load during peak periods such as the Double‑Eleven shopping festival.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
