Big Data 16 min read

Async I/O for Dimension Table Joins in Apache Flink

This article explains how to handle dimension table joins in Apache Flink streaming by leveraging Async I/O to perform non‑blocking external lookups, provides detailed code examples for both synchronous and asynchronous functions, discusses configuration parameters, and outlines best practices and pitfalls.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Async I/O for Dimension Table Joins in Apache Flink

In Flink streaming, dimension tables are often needed to enrich fact streams with additional attributes such as product category, manufacturer details, or logistics information. The default synchronous MapFunction blocks on each external request, leading to high latency and low throughput.

Flink introduced Async I/O in version 1.2, allowing I/O operations to be performed asynchronously. Multiple requests can be sent concurrently, and whichever response arrives first is processed, dramatically improving throughput and reducing latency.

The article provides a complete example that reads an order stream from Kafka, enriches it with user information via an asynchronous MySQL lookup, and writes the result back to Kafka. The key parts are:

public class AsyncIOFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        // ... Kafka source setup ...
        SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator =
            AsyncDataStream.unorderedWait(order, new RichAsyncFunction<Order, Tuple7<...>>() {
                private Connection connection;
                @Override public void open(Configuration parameters) throws Exception {
                    Class.forName("com.mysql.jdbc.Driver");
                    connection = DriverManager.getConnection("url", "user", "pwd");
                    connection.setAutoCommit(false);
                }
                @Override public void asyncInvoke(Order input, ResultFuture<Tuple7<...>> resultFuture) throws Exception {
                    // async DB query and complete resultFuture
                }
                @Override public void close() throws Exception { if (connection != null) connection.close(); }
            }, 5000, TimeUnit.MILLISECONDS, 100);
        operator.print();
        env.execute("AsyncIOFunctionTest");
    }
}

The asynchronous function implements three methods: open (initialize resources), asyncInvoke (perform the non‑blocking request and complete the ResultFuture), and close (release resources).

A template for an Async I/O function is also shown, using Java 8 Futures to wrap the external request and forward the result to Flink via ResultFuture.complete. The template highlights the importance of the Timeout (maximum wait time) and Capacity (maximum concurrent requests) parameters, which control back‑pressure and failure handling.

Flink 1.9 further simplifies dimension table lookups by supporting Blink's planner and the LookupableTableSource interface. Implementing this interface requires providing: getLookupFunction for synchronous lookups (optional). getAsyncLookupFunction for asynchronous lookups. isAsyncEnabled returning true to enable async mode.

An example RedisAsyncLookupTableSource implements the interface and returns a custom MyAsyncLookupFunction that uses Lettuce's asynchronous Redis client to fetch dimension data.

public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
    private final String[] fieldNames;
    private final TypeInformation[] fieldTypes;
    private transient RedisAsyncCommands<String, String> async;
    public MyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) { ... }
    @Override public void open(FunctionContext context) {
        RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
        async = redisClient.connect().async();
    }
    @Override public void eval(CompletableFuture<Collection<Row>> future, Object... params) {
        String key = String.join(":", "userInfo", "userId", params[0].toString(), "userName");
        async.get(key).thenAccept(value -> future.complete(Collections.singletonList(Row.of(key, value))));
    }
    @Override public TypeInformation<Row> getResultType() { return new RowTypeInfo(fieldTypes, fieldNames); }
}

Key practical considerations when using Async I/O include:

The external system must provide an asynchronous client; otherwise, you need to manage thread safety and resource initialization.

The eval method must complete the provided CompletableFuture once the async request finishes.

Even with async lookups, high‑volume joins can become bottlenecks; caching or using a fast store like HBase for dimension data is recommended.

References for further reading are listed at the end of the article.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkStreamingasync I/ODimension Table Join
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.