Root Cause Analysis of Flink TaskManager Failover Causing Data Reprocessing and Business Impact
An incident report details how a scheduled machine reboot on Alibaba Cloud triggered a Flink TaskManager failover, leading to excessive data replay, increased ES pressure, and significant business latency, and explains the root cause involving disabled checkpoints and timestamp‑based offset consumption.
Background
Alibaba Cloud warned that a machine would automatically reboot during the next morning peak; following standard operations, the machine was removed from the cluster during off‑peak hours after notifying stakeholders. Removing the machine caused the TaskManager pod on that node to terminate, prompting Flink to request new TaskManagers on other nodes, during which task failover occurred.
Within ten minutes, colleagues in the on‑call group reported abnormal metrics; investigation revealed that the affected tasks were consuming data that was ten minutes older than expected.
The consumption delay (over 10 seconds) was unacceptable for the business scenario, causing metric degradation and financial loss, and the incident was classified as P3.
Resolution: the business team stopped several tasks syncing data to an Elasticsearch index, rebuilt the primary index task with increased concurrency, and switched downstream consumers to the new index.
Fault Root‑Cause Analysis Process
Discussion with the business side suggested that high ES load caused back‑pressure in Flink, leading to delayed Kafka consumption.
Initial hypothesis: after removing the faulty machine, failover caused tasks to restart from the last checkpoint (default 2‑minute interval), producing extra data and pressure on ES, which in turn caused back‑pressure. However, further monitoring of a secondary (backup) task revealed that after failover it began consuming data from 22 hours earlier.
Only a subset of TaskManagers processed recent data, while the majority started from the old offset, causing the backup task’s TPS to jump from ~2 k/s to 120 k/s after failover.
Further investigation showed that the primary task had checkpoints enabled, while the backup task had them disabled and used a manually specified timestamp/offset to start consumption. Reviewing the KafkaSource code confirmed that when checkpoints are disabled, the consumer starts from the user‑provided offset.
Root‑Cause Verification
Test code (with checkpoints disabled and a start‑from‑timestamp configuration) was executed to reproduce the behavior.
@Slf4j
public class CheckpointTest {
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
env.setParallelism(parameters.getInt("envParallelism", 1));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, Time.of(10, TimeUnit.SECONDS)));
// manually disable checkpoint
env.getCheckpointConfig().disableCheckpointing();
FlinkKafkaConsumer
flinkKafkaConsumer = new FlinkKafkaConsumer<>(
parameters.get("sourceTopic", "yarn_flink_log"),
new SimpleStringSchema(),
buildKafkaProps(parameters));
if (parameters.get("timestamp") != null) {
log.info("Start from timestamp " + parameters.getLong("timestamp"));
// manually set start timestamp, which internally resolves to the corresponding offset
flinkKafkaConsumer.setStartFromTimestamp(parameters.getLong("timestamp"));
}
env.addSource(flinkKafkaConsumer)
.setParallelism(parameters.getInt("sourceParallelism", 1))
.print();
env.execute("test checkpoint not enable");
}
public static Properties buildKafkaProps(ParameterTool parameterTool) {
Properties props = parameterTool.getProperties();
props.put("bootstrap.servers", parameterTool.get("sourceBootstrapServers", "localhost:9092"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put("group.id", parameterTool.get("sourceGroupId", String.valueOf(UUID.randomUUID())));
props.put("flink.partition-discovery.interval-millis", "10000");
props.put("auto.offset.reset", parameterTool.get("sourceAutoOffsetReset", "latest"));
return props;
}
}The test used timestamp 1666819431000 (2022‑10‑27 05:23:51). After manually killing a TaskManager pod at 2022‑10‑28 15:31, the newly launched pod started consuming from that exact timestamp, confirming the issue.
Log excerpt from the new TaskManager shows it reading partitions from the specified timestamp:
2022-10-28 15:31:05,254 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - Consumer subtask 0 will start reading the following 6 partitions from timestamp 1666819431000: [KafkaTopicPartition{topic='yxxx_log', partition=34}, ...]
2022-10-28 15:31:05,267 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase [] - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='yxxx_log', partition=34}=(782507410,-1), ...}.Conclusion
Disabling checkpoints combined with a manually specified timestamp or offset leads to duplicate data reads after a failover.
Thoughts
1. Core streaming jobs must have consumption‑delay alerts configured to detect issues early and reduce loss.
2. Do not overlook any metrics; missing indicators can hide root causes.
3. Changing default cluster configurations (such as disabling checkpoints) should be done cautiously and thoroughly tested.
Sohu Tech Products
A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.