Error Handling Strategies for Kafka Connectors: Immediate Stop, Silent Ignoring, and Dead‑Letter Queue
This article explains how to configure Kafka Connect error handling options—including stopping on failure, silently ignoring malformed messages, and routing failed records to a dead‑letter queue—while providing practical examples, monitoring techniques, and code snippets for robust data pipelines.
Kafka Connectors are a powerful framework for building streaming pipelines between Kafka and other systems, allowing data to be ingested from sources such as databases, message queues, and files, and to be written to targets like document stores, NoSQL databases, and object storage.
Stop Processing Immediately
In some cases you may want the connector to halt as soon as an error occurs, especially when bad data originates upstream and must be fixed at the source.
errors.tolerance = none
The example below configures a FileStreamSinkConnector to read JSON from a topic and write it to a plain‑text file. When an invalid JSON record is encountered, the connector enters the FAILED state.
curl -X POST http:
"name": "file_sink_01",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics": "test_topic_json",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_01.txt"
}
}After the failure, the connector status can be checked:
$ curl -s "http://localhost:8083/connectors/file_sink_01/status" | \
jq -c -M '[.name,.tasks[].state]'
["file_sink_01","FAILED"]Silently Ignore Invalid Messages
To keep the pipeline running while discarding bad records, set the tolerance to all: errors.tolerance = all Example configuration:
curl -X POST http:
"name": "file_sink_05",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics": "test_topic_json",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_05.txt",
"errors.tolerance": "all"
}
}After starting, the connector remains RUNNING even with malformed messages, and only valid records are written to the output file.
$ head data/file_sink_05.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
…Detecting Data Loss
When errors.tolerance=all is used, discarded messages are not logged by default. To detect loss, compare the source topic message count with the number of records written to the sink:
$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l
150
$ wc -l data/file_sink_05.txt
100 data/file_sink_05.txtA more reliable method is to monitor error‑rate metrics via JMX.
Routing to a Dead‑Letter Queue (DLQ)
Configure the connector to send unprocessable records to a separate Kafka topic (the DLQ):
errors.tolerance = all
errors.deadletterqueue.topic.name = dlq_file_sink_02
errors.deadletterqueue.topic.replication.factor = 1Example:
curl -X POST http:
"name": "file_sink_02",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics": "test_topic_json",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_02.txt",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq_file_sink_02",
"errors.deadletterqueue.topic.replication.factor": 1
}
}The connector stays RUNNING and valid records are written to the target file, while failed records appear in the DLQ topic.
Capturing Failure Reasons in Message Headers
Enable header enrichment so that the DLQ messages contain metadata about the failure:
errors.deadletterqueue.context.headers.enable = trueConfiguration example:
curl -X POST http:
"name": "file_sink_03",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics": "test_topic_json",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_03.txt",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq_file_sink_03",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": true
}
}Using kafkacat you can inspect the headers to see details such as the source topic, partition, offset, connector name, and the exception class/message.
Logging Failure Reasons
Alternatively, enable logging of failed records:
errors.log.enable = true
errors.log.include.messages = trueWhen enabled, each failed record generates a detailed log entry with the same metadata shown in the header approach.
Processing DLQ Messages
Since a DLQ is just a Kafka topic, standard tools (e.g., kafkacat, KSQL) can be used to inspect, replay, or reprocess the failed records.
Monitoring DLQ with KSQL
Create KSQL streams over the DLQ topics and aggregate counts per minute to detect spikes:
CREATE STREAM dlq_file_sink_06__01 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__01', VALUE_FORMAT='DELIMITED');
CREATE STREAM dlq_file_sink_06__02 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__02', VALUE_FORMAT='DELIMITED');
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS
SELECT 'dlq_file_sink_06__01' AS SINK_NAME,
'Records: ' AS GROUP_COL,
MSG
FROM dlq_file_sink_06__01;
INSERT INTO DLQ_MONITOR
SELECT 'dlq_file_sink_06__02' AS SINK_NAME,
'Records: ' AS GROUP_COL,
MSG
FROM dlq_file_sink_06__02;
CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS,
SINK_NAME,
GROUP_COL,
COUNT(*) AS DLQ_MESSAGE_COUNT
FROM DLQ_MONITOR
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY SINK_NAME, GROUP_COL;
CREATE TABLE DLQ_BREACH AS
SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT
FROM DLQ_MESSAGE_COUNT_PER_MIN
WHERE DLQ_MESSAGE_COUNT > 5;The DLQ_BREACH table can be subscribed to by an alerting service to trigger notifications when error volume exceeds a threshold.
Summary
Effective error handling is essential for reliable data pipelines. If any bad record is unacceptable, configure the connector to stop immediately (the default). Otherwise, use errors.tolerance=all with a dead‑letter queue, monitor error metrics via JMX or KSQL, and optionally capture failure reasons in headers or logs to enable troubleshooting and reprocessing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
