Big Data 9 min read

Using ElasticsearchSink with Apache Flink: Configuration, Retry Strategies, and Failure Handling

This article introduces the ElasticsearchSink for Apache Flink, explains how to add Maven dependencies, implement the sink with configuration and retry settings, details failure handlers, and highlights important considerations such as exception handling and checkpoint requirements for reliable streaming pipelines.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using ElasticsearchSink with Apache Flink: Configuration, Retry Strategies, and Failure Handling

1. ElasticsearchSink Overview

When processing data with Flink, the result must be stored or exported via a Sink. Common Flink sink connectors include Kafka, Elasticsearch, Hadoop FileSystem, etc. This article focuses on the widely used ElasticsearchSink, covering its usage and production‑environment considerations.

Apache Kafka<br/>Elasticsearch<br/>Elasticsearch 2x<br/>Hadoop FileSystem<br/>…

2. Usage

1. Add Maven Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.3.1</version>
</dependency>

Adjust the version numbers according to your Flink and Elasticsearch versions.

2. Implement the Code

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
config.put("bulk.flush.max.actions", "1");

List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));

Extended Configuration

To avoid data loss when the ES cluster fluctuates, add retry‑related settings:

// Enable retry mechanism
config.put("bulk.flush.backoff.enable", "true");
// Retry strategy: EXPONENTIAL or CONSTANT
config.put("bulk.flush.backoff.type", "EXPONENTIAL");
// Base delay for exponential backoff
config.put("bulk.flush.backoff.delay", "2");
// Number of retry attempts
config.put("bulk.flush.backoff.retries", "3");
Other useful parameters: bulk.flush.max.actions: max records per bulk request bulk.flush.max.size.mb: max size per bulk request bulk.flush.interval.ms: fixed interval for flushing

3. Failure Handler

When ES queues are full or nodes fail, writes can fail. Provide a custom ActionRequestFailureHandler when creating the sink to handle such cases:

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                     Throwable failure,
                     int restStatusCode,
                     RequestIndexer indexer) throws Throwable {
            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                indexer.add(action); // retry later
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // custom handling
            } else {
                throw failure;
            }
        }
    }));

If only retry is needed, the built‑in RetryRejectedExecutionFailureHandler can be used, which retries on EsRejectedExecutionException .

4. Additional Notes

1. Do Not Use try‑catch‑Exception Inside the Sink

Even if you wrap the process() method with try‑catch, certain exceptions (e.g., EsRejectedException) bypass the catch block because the default ActionRequestFailureHandler re‑throws them. Implement a custom failure handler or let the exception propagate.

2. Retry Mechanism Depends on Checkpointing

The retry logic is triggered during Flink checkpoints. Enable checkpoints with env.enableCheckpoint(); otherwise the retry code will never execute.

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    checkErrorAndRethrow();
    if (flushOnCheckpoint) {
        do {
            bulkProcessor.flush();
            checkErrorAndRethrow();
        } while (numPendingRequests.get() != 0);
    }
}

3. Summary

Although the ES sink implements CheckpointedFunction , it leverages the checkpoint callback mainly to drive its own retry mechanism rather than to restore state, offering a useful pattern for building resilient streaming jobs.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkElasticsearchRetrySinkFailure Handling
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.