Big Data 21 min read

Understanding Flink State, Checkpoints, Savepoints, and Fault Tolerance

This article explains Flink's state concepts, the distinction between keyed and operator state, available state backends, TTL configuration, the mechanics of checkpoints and savepoints, and the two‑phase commit protocol for ensuring exactly‑once processing in streaming applications.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink State, Checkpoints, Savepoints, and Fault Tolerance

State and Fault Tolerance

In Flink, stateful computation is one of the most important features. "State" refers to the intermediate results of a Flink program, which can include business data or metadata. Flink provides several state types and a dedicated mechanism for persisting and managing state.

When working with state, it might also be useful to read about Flink’s state backends. Flink provides different state backends that specify how and where state is stored. State can be located on Java’s heap or off‑heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic.

State can be stored in JVM heap, off‑heap memory, or external stores such as RocksDB. Typical use cases include complex event pattern detection, time‑window aggregation, streaming model training, and historic data lookup.

Flink State Classification and Usage

Flink distinguishes two main categories of state based on whether the data stream is partitioned by a key:

Keyed State : scoped to a specific key after a keyBy operation. Each key has its own isolated state.

Operator State (Non‑Keyed State) : shared among all parallel instances of an operator; useful for sources, sinks, or operators that need to keep global information.

Keyed state is accessed via state descriptors such as ValueStateDescriptor, MapStateDescriptor, ReducingStateDescriptor, etc. The following example shows how to obtain a ValueState inside a RichFlatMapFunction and use it to compute a sliding average:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 5L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .printToErr();
    env.execute("submit job");
}

public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    private transient ValueState<Tuple2<Long, Long>> sum;
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum;
        if (sum.value() == null) {
            currentSum = Tuple2.of(0L, 0L);
        } else {
            currentSum = sum.value();
        }
        currentSum.f0 += 1;          // count
        currentSum.f1 += input.f1;   // sum of values
        sum.update(currentSum);
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
            new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.seconds(10))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();
        descriptor.enableTimeToLive(ttlConfig);
        sum = getRuntimeContext().getState(descriptor);
    }
}

Operator state is typically used in source or sink operators to store offsets or cache output, ensuring exactly‑once semantics.

State TTL (Time‑to‑Live)

TTL can be configured for any state descriptor. When a state expires, its value is cleared automatically.

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

descriptor.enableTimeToLive(ttlConfig);

State Backend Types and Configuration

Flink offers three main state backends:

MemoryStateBackend – stores state in the TaskManager’s JVM heap; suitable for local debugging.

FsStateBackend – writes checkpoint snapshots to an external filesystem (e.g., HDFS) while keeping small metadata in the JobManager.

RocksDBStateBackend – persists state in an embedded RocksDB instance; supports very large state and incremental snapshots.

MemoryStateBackend

Example configuration (debugging use only):

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE, false));

FsStateBackend

Specify a filesystem path (e.g., HDFS) where checkpoints are stored:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints", false));

RocksDBStateBackend

Uses RocksDB on the TaskManager’s local disk, allowing state sizes far beyond the limits of FsStateBackend, at the cost of slightly lower throughput. It also uniquely supports incremental snapshots.

Checkpoints (Fault Tolerance)

Flink’s asynchronous lightweight distributed snapshot mechanism creates consistent global snapshots of all operator and keyed states. When a failure occurs, the job can be restored to the latest checkpoint and resume processing from that point.

Checkpoint generation follows a barrier‑aligned algorithm:

JobManager sends a checkpoint barrier to all sources.

Sources pause record emission, write a local snapshot, and broadcast the barrier downstream.

Operators buffer incoming records until they have received barriers from all input partitions (barrier alignment).

Once all barriers are received, operators write their state to the configured backend and forward the barrier.

Sinks acknowledge the barrier; when all acknowledgments are collected, the checkpoint is considered complete.

Recovery Process

Upon failure, Flink restarts the job, restores state from the latest checkpoint, and reprocesses any records that were in‑flight during the snapshot, guaranteeing exactly‑once semantics.

Savepoints (Manual Snapshots)

Savepoints are user‑triggered snapshots based on the same checkpoint mechanism but intended for maintenance tasks such as version upgrades, job migration, or manual backup. Unlike checkpoints, they are not created automatically and must be cleaned up manually.

Two‑Phase Commit

When a job writes to external systems (e.g., Kafka sink), Flink uses a two‑phase commit protocol:

Pre‑commit: each operator writes its state and, if successful, the barrier is forwarded.

Commit: after all operators have successfully pre‑committed, the JobManager instructs them to finalize the commit. If any pre‑commit fails, the job rolls back to the last successful checkpoint.

This ensures that either all side‑effects are visible or none are, preserving atomicity across the distributed pipeline.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkState ManagementStreamingfault toleranceCheckpointsSavepoints
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.