Understanding Flink Checkpoint Mechanism and Configuration
This article explains Flink's checkpoint mechanism, its execution flow, common configuration options, and the benefits and considerations of incremental checkpoints using the RocksDB state backend, providing practical code examples and YAML settings for reliable stream processing.
Flink provides a robust checkpoint mechanism to ensure high availability and exactly‑once processing, based on the Chandy‑Lamport distributed snapshot algorithm.
The JobManager creates a CheckpointCoordinator that periodically sends barriers to all source operators; each operator snapshots its state, persists it, and forwards the barrier downstream until the sink is reached.
Typical checkpoint settings include enabling checkpointing, configuring mode, timeout, pause, concurrency, externalized checkpoints, and failure handling, as shown in the code example.
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);Configuration in flink-conf.yaml controls the state backend, checkpoint directory, incremental mode, and other parameters.
# ====================================================================
# Fault tolerance and checkpointing
# ====================================================================
# The backend that will be used to store operator state checkpoints if checkpointing is enabled.
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the <class-name-of-factory>.
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled state backends.
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that support incremental checkpoints (like the RocksDB state backend).
# state.backend.incremental: falseIncremental checkpoints, supported by the RocksDB state backend, store only state differences, dramatically reducing checkpoint time for large state sizes; the article explains how RocksDB’s sstable management and reference counting enable this.
When using incremental checkpoints, recovery may take longer and retained checkpoints can grow, so storage and bandwidth considerations are important.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
