Understanding Async I/O in Apache Flink: Usage, Implementation, and Fault Tolerance
This article explains how to use Async I/O in Flink, describes the ordered and unordered output modes, details the internal AsyncWaitOperator implementation with its producer‑consumer model, and discusses fault‑tolerance mechanisms including state snapshot and recovery.
Async I/O allows Flink jobs to issue asynchronous requests, turning synchronous operations into non‑blocking calls. To use it, you need a client that supports asynchronous requests or a multithreaded wrapper that converts synchronous calls into futures.
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept((String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);AsyncDataStream provides two transformation methods: orderedWait and unorderedWait, corresponding to ordered and unordered output modes. The choice matters because asynchronous request completion times are nondeterministic; earlier requests may finish later than later ones.
In ordered mode, results are emitted in the same order as input messages, ensuring that the submission order matches the arrival order. This is achieved with an OrderedStreamElementQueue where only the head element can be emitted once it completes.
public class OrderedStreamElementQueue implements StreamElementQueue {
/** Capacity of this queue. */
private final int capacity;
/** Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque<StreamElementQueueEntry<?>> queue;
@Override
public AsyncResult peekBlockingly() throws InterruptedException { // block until head is done
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
LOG.debug("Peeked head element from ordered stream element queue with filling degree ({}/{}).", queue.size(), capacity);
return queue.peek();
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
notFull.signalAll();
LOG.debug("Polled head element from ordered stream element queue. New filling degree ({}/{}).", queue.size() - 1, capacity);
return queue.poll();
} finally {
lock.unlock();
}
}
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (queue.size() < capacity) {
addEntry(streamElementQueueEntry);
LOG.debug("Put element into ordered stream element queue. New filling degree ({}/{}).", queue.size(), capacity);
return true;
} else {
LOG.debug("Failed to put element into ordered stream element queue because it was full ({}/{}).", queue.size(), capacity);
return false;
}
} finally {
lock.unlock();
}
}
}Unordered mode emits results as soon as the corresponding asynchronous request finishes, regardless of input order. When using event time, the operator still respects watermark semantics: a watermark can only be emitted after all prior messages have been processed.
The implementation relies on AsyncWaitOperator, which extends AbstractUdfStreamOperator and uses a producer‑consumer pattern. Inside the operator, a StreamElementQueue (either ordered or unordered) decouples asynchronous computation from result emission, while an Emitter thread consumes completed promises and forwards them downstream.
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Queue to store the currently in‑flight stream elements into. */
private transient StreamElementQueue queue;
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
private transient ExecutorService executor;
private transient Emitter<OUT> emitter;
private transient Thread emitterThread;
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
this.checkpointingLock = getContainingTask().getCheckpointLock();
this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
this.executor = Executors.newSingleThreadExecutor();
switch (outputMode) {
case ORDERED:
queue = new OrderedStreamElementQueue(capacity, executor, this);
break;
case UNORDERED:
queue = new UnorderedStreamElementQueue(capacity, executor, this);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}
}
@Override
public void open() throws Exception {
super.open();
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ")");
emitterThread.setDaemon(true);
emitterThread.start();
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
if (timeout > 0L) {
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
streamRecordBufferEntry.onComplete((value) -> timerFuture.cancel(true), executor);
}
addAsyncBufferEntry(streamRecordBufferEntry);
userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
// ... additional methods for state snapshot, recovery, etc.
}For fault tolerance, the operator snapshots all in‑flight async requests and any pending elements. Upon recovery, these elements are re‑processed to guarantee exactly‑once semantics; completed requests whose results have already been emitted do not need to be stored.
Overall, Flink's Async I/O feature enables high‑throughput, low‑latency stream processing by offloading blocking I/O to asynchronous calls while providing configurable ordering guarantees and robust state handling.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
