Using ElasticsearchSink with Apache Flink: Configuration, Retry Strategies, and Failure Handling
This article introduces the ElasticsearchSink for Apache Flink, explains how to add Maven dependencies, implement the sink with configuration and retry settings, details failure handlers, and highlights important considerations such as exception handling and checkpoint requirements for reliable streaming pipelines.
1. ElasticsearchSink Overview
When processing data with Flink, the result must be stored or exported via a Sink. Common Flink sink connectors include Kafka, Elasticsearch, Hadoop FileSystem, etc. This article focuses on the widely used ElasticsearchSink, covering its usage and production‑environment considerations.
Apache Kafka<br/>Elasticsearch<br/>Elasticsearch 2x<br/>Hadoop FileSystem<br/>…2. Usage
1. Add Maven Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.3.1</version>
</dependency>Adjust the version numbers according to your Flink and Elasticsearch versions.
2. Implement the Code
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));Extended Configuration
To avoid data loss when the ES cluster fluctuates, add retry‑related settings:
// Enable retry mechanism
config.put("bulk.flush.backoff.enable", "true");
// Retry strategy: EXPONENTIAL or CONSTANT
config.put("bulk.flush.backoff.type", "EXPONENTIAL");
// Base delay for exponential backoff
config.put("bulk.flush.backoff.delay", "2");
// Number of retry attempts
config.put("bulk.flush.backoff.retries", "3");Other useful parameters: bulk.flush.max.actions: max records per bulk request bulk.flush.max.size.mb: max size per bulk request bulk.flush.interval.ms: fixed interval for flushing
3. Failure Handler
When ES queues are full or nodes fail, writes can fail. Provide a custom ActionRequestFailureHandler when creating the sink to handle such cases:
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throws Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
indexer.add(action); // retry later
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// custom handling
} else {
throw failure;
}
}
}));If only retry is needed, the built‑in RetryRejectedExecutionFailureHandler can be used, which retries on EsRejectedExecutionException .
4. Additional Notes
1. Do Not Use try‑catch‑Exception Inside the Sink
Even if you wrap the process() method with try‑catch, certain exceptions (e.g., EsRejectedException) bypass the catch block because the default ActionRequestFailureHandler re‑throws them. Implement a custom failure handler or let the exception propagate.
2. Retry Mechanism Depends on Checkpointing
The retry logic is triggered during Flink checkpoints. Enable checkpoints with env.enableCheckpoint(); otherwise the retry code will never execute.
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkErrorAndRethrow();
if (flushOnCheckpoint) {
do {
bulkProcessor.flush();
checkErrorAndRethrow();
} while (numPendingRequests.get() != 0);
}
}3. Summary
Although the ES sink implements CheckpointedFunction , it leverages the checkpoint callback mainly to drive its own retry mechanism rather than to restore state, offering a useful pattern for building resilient streaming jobs.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
