Error Handling Strategies for Kafka Connectors: Immediate Stop, Silent Ignoring, and Dead‑Letter Queue

This article explains how to configure Kafka Connect error handling options—including stopping on failure, silently ignoring malformed messages, and routing failed records to a dead‑letter queue—while providing practical examples, monitoring techniques, and code snippets for robust data pipelines.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Error Handling Strategies for Kafka Connectors: Immediate Stop, Silent Ignoring, and Dead‑Letter Queue

Kafka Connectors are a powerful framework for building streaming pipelines between Kafka and other systems, allowing data to be ingested from sources such as databases, message queues, and files, and to be written to targets like document stores, NoSQL databases, and object storage.

Stop Processing Immediately

In some cases you may want the connector to halt as soon as an error occurs, especially when bad data originates upstream and must be fixed at the source.

errors.tolerance = none

The example below configures a FileStreamSinkConnector to read JSON from a topic and write it to a plain‑text file. When an invalid JSON record is encountered, the connector enters the FAILED state.

curl -X POST http:
    "name": "file_sink_01",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "topics": "test_topic_json",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "file": "/data/file_sink_01.txt"
    }
}

After the failure, the connector status can be checked:

$ curl -s "http://localhost:8083/connectors/file_sink_01/status" | \
    jq -c -M '[.name,.tasks[].state]'
["file_sink_01","FAILED"]

Silently Ignore Invalid Messages

To keep the pipeline running while discarding bad records, set the tolerance to all: errors.tolerance = all Example configuration:

curl -X POST http:
    "name": "file_sink_05",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "topics": "test_topic_json",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "file": "/data/file_sink_05.txt",
        "errors.tolerance": "all"
    }
}

After starting, the connector remains RUNNING even with malformed messages, and only valid records are written to the output file.

$ head data/file_sink_05.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
…

Detecting Data Loss

When errors.tolerance=all is used, discarded messages are not logged by default. To detect loss, compare the source topic message count with the number of records written to the sink:

$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l
150
$ wc -l data/file_sink_05.txt
100 data/file_sink_05.txt

A more reliable method is to monitor error‑rate metrics via JMX.

Routing to a Dead‑Letter Queue (DLQ)

Configure the connector to send unprocessable records to a separate Kafka topic (the DLQ):

errors.tolerance = all
errors.deadletterqueue.topic.name = dlq_file_sink_02
errors.deadletterqueue.topic.replication.factor = 1

Example:

curl -X POST http:
    "name": "file_sink_02",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "topics": "test_topic_json",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "file": "/data/file_sink_02.txt",
        "errors.tolerance": "all",
        "errors.deadletterqueue.topic.name": "dlq_file_sink_02",
        "errors.deadletterqueue.topic.replication.factor": 1
    }
}

The connector stays RUNNING and valid records are written to the target file, while failed records appear in the DLQ topic.

Capturing Failure Reasons in Message Headers

Enable header enrichment so that the DLQ messages contain metadata about the failure:

errors.deadletterqueue.context.headers.enable = true

Configuration example:

curl -X POST http:
    "name": "file_sink_03",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "topics": "test_topic_json",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "file": "/data/file_sink_03.txt",
        "errors.tolerance": "all",
        "errors.deadletterqueue.topic.name": "dlq_file_sink_03",
        "errors.deadletterqueue.topic.replication.factor": 1,
        "errors.deadletterqueue.context.headers.enable": true
    }
}

Using kafkacat you can inspect the headers to see details such as the source topic, partition, offset, connector name, and the exception class/message.

Logging Failure Reasons

Alternatively, enable logging of failed records:

errors.log.enable = true
errors.log.include.messages = true

When enabled, each failed record generates a detailed log entry with the same metadata shown in the header approach.

Processing DLQ Messages

Since a DLQ is just a Kafka topic, standard tools (e.g., kafkacat, KSQL) can be used to inspect, replay, or reprocess the failed records.

Monitoring DLQ with KSQL

Create KSQL streams over the DLQ topics and aggregate counts per minute to detect spikes:

CREATE STREAM dlq_file_sink_06__01 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__01', VALUE_FORMAT='DELIMITED');
CREATE STREAM dlq_file_sink_06__02 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__02', VALUE_FORMAT='DELIMITED');
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS
  SELECT 'dlq_file_sink_06__01' AS SINK_NAME,
         'Records: ' AS GROUP_COL,
         MSG
    FROM dlq_file_sink_06__01;
INSERT INTO DLQ_MONITOR
  SELECT 'dlq_file_sink_06__02' AS SINK_NAME,
         'Records: ' AS GROUP_COL,
         MSG
    FROM dlq_file_sink_06__02;
CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS
  SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS,
         SINK_NAME,
         GROUP_COL,
         COUNT(*) AS DLQ_MESSAGE_COUNT
    FROM DLQ_MONITOR
          WINDOW TUMBLING (SIZE 1 MINUTE)
 GROUP BY SINK_NAME, GROUP_COL;
CREATE TABLE DLQ_BREACH AS
    SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT
      FROM DLQ_MESSAGE_COUNT_PER_MIN
     WHERE DLQ_MESSAGE_COUNT > 5;

The DLQ_BREACH table can be subscribed to by an alerting service to trigger notifications when error volume exceeds a threshold.

Summary

Effective error handling is essential for reliable data pipelines. If any bad record is unacceptable, configure the connector to stop immediately (the default). Otherwise, use errors.tolerance=all with a dead‑letter queue, monitor error metrics via JMX or KSQL, and optionally capture failure reasons in headers or logs to enable troubleshooting and reprocessing.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

monitoringConfigurationDead Letter Queueerror-handling
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.