Big Data 17 min read

Understanding Async I/O in Apache Flink: Usage, Implementation, and Fault Tolerance

This article explains how to use Async I/O in Flink, describes the ordered and unordered output modes, details the internal AsyncWaitOperator implementation with its producer‑consumer model, and discusses fault‑tolerance mechanisms including state snapshot and recovery.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Async I/O in Apache Flink: Usage, Implementation, and Fault Tolerance

Async I/O allows Flink jobs to issue asynchronous requests, turning synchronous operations into non‑blocking calls. To use it, you need a client that supports asynchronous requests or a multithreaded wrapper that converts synchronous calls into futures.

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);
        // set the callback to be executed once the request by the client is complete
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept((String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

AsyncDataStream provides two transformation methods: orderedWait and unorderedWait, corresponding to ordered and unordered output modes. The choice matters because asynchronous request completion times are nondeterministic; earlier requests may finish later than later ones.

In ordered mode, results are emitted in the same order as input messages, ensuring that the submission order matches the arrival order. This is achieved with an OrderedStreamElementQueue where only the head element can be emitted once it completes.

public class OrderedStreamElementQueue implements StreamElementQueue {
    /** Capacity of this queue. */
    private final int capacity;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final ArrayDeque<StreamElementQueueEntry<?>> queue;

    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {  // block until head is done
        lock.lockInterruptibly();
        try {
            while (queue.isEmpty() || !queue.peek().isDone()) {
                headIsCompleted.await();
            }
            LOG.debug("Peeked head element from ordered stream element queue with filling degree ({}/{}).", queue.size(), capacity);
            return queue.peek();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult poll() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (queue.isEmpty() || !queue.peek().isDone()) {
                headIsCompleted.await();
            }
            notFull.signalAll();
            LOG.debug("Polled head element from ordered stream element queue. New filling degree ({}/{}).", queue.size() - 1, capacity);
            return queue.poll();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            if (queue.size() < capacity) {
                addEntry(streamElementQueueEntry);
                LOG.debug("Put element into ordered stream element queue. New filling degree ({}/{}).", queue.size(), capacity);
                return true;
            } else {
                LOG.debug("Failed to put element into ordered stream element queue because it was full ({}/{}).", queue.size(), capacity);
                return false;
            }
        } finally {
            lock.unlock();
        }
    }
}

Unordered mode emits results as soon as the corresponding asynchronous request finishes, regardless of input order. When using event time, the operator still respects watermark semantics: a watermark can only be emitted after all prior messages have been processed.

The implementation relies on AsyncWaitOperator, which extends AbstractUdfStreamOperator and uses a producer‑consumer pattern. Inside the operator, a StreamElementQueue (either ordered or unordered) decouples asynchronous computation from result emission, while an Emitter thread consumes completed promises and forwards them downstream.

public class AsyncWaitOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, OperatorActions {

    /** Queue to store the currently in‑flight stream elements into. */
    private transient StreamElementQueue queue;
    private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
    private transient ExecutorService executor;
    private transient Emitter<OUT> emitter;
    private transient Thread emitterThread;

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        this.checkpointingLock = getContainingTask().getCheckpointLock();
        this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
        this.executor = Executors.newSingleThreadExecutor();
        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue(capacity, executor, this);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue(capacity, executor, this);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
        this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ")");
        emitterThread.setDaemon(true);
        emitterThread.start();
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
        if (timeout > 0L) {
            long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
            final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
                timeoutTimestamp,
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                    }
                });
            streamRecordBufferEntry.onComplete((value) -> timerFuture.cancel(true), executor);
        }
        addAsyncBufferEntry(streamRecordBufferEntry);
        userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }

    // ... additional methods for state snapshot, recovery, etc.
}

For fault tolerance, the operator snapshots all in‑flight async requests and any pending elements. Upon recovery, these elements are re‑processed to guarantee exactly‑once semantics; completed requests whose results have already been emitted do not need to be stored.

Overall, Flink's Async I/O feature enables high‑throughput, low‑latency stream processing by offloading blocking I/O to asynchronous calls while providing configurable ordering guarantees and robust state handling.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaFlinkStreamProcessingbigdataasyncioFaultTolerance
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.