Big Data 4 min read

Using Flink Distributed Cache: Overview and Example

This article explains Flink's distributed cache feature, describes its registration and retrieval mechanisms, and provides a complete Java example that demonstrates how to register a file, access it within a RichMapFunction, and print the processed results.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Flink Distributed Cache: Overview and Example

Flink provides a distributed cache similar to Hadoop, allowing users to conveniently read local files in parallel functions and store them on TaskManager nodes to avoid repeated fetching.

The cache works by registering a file or directory (local or remote such as HDFS or S3) with the ExecutionEnvironment, assigning it a name, and having Flink automatically copy the file to all TaskManager nodes once during execution. Users can then retrieve the file by its name from the local file system of each TaskManager.

Example: Register a file in the ExecutionEnvironment

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text", "a.txt");

In a user function (a RichMapFunction), the cached file can be accessed via the RuntimeContext:

DataSet<String> result = data.map(new RichMapFunction<String, String>() {
    private ArrayList<String> dataList = new ArrayList<>();
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // Retrieve the cached file
        File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
        List<String> lines = FileUtils.readLines(myFile);
        for (String line : lines) {
            this.dataList.add(line);
            System.err.println("Distributed cache file:" + line);
        }
    }
    @Override
    public String map(String value) throws Exception {
        // Use the cached data
        System.err.println("Using dataList:" + dataList + "------------" + value);
        // Business logic
        return dataList + ":" + value;
    }
});
result.printToErr();

The full source code combines the environment setup, file registration, and the RichMapFunction implementation shown above.

public class DisCacheTest {
    public static void main(String[] args) throws Exception {
        // Get execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // Register a file (can be HDFS or local for testing)
        env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text", "a.txt");
        DataSource<String> data = env.fromElements("a", "b", "c", "d");
        DataSet<String> result = data.map(new RichMapFunction<String, String>() { /* same as above */ });
        result.printToErr();
    }
}

The program outputs each input element prefixed by the list of words read from the cached file, for example:

[hello, flink, hello, FLINK]:a
[hello, flink, hello, FLINK]:b
[hello, flink, hello, FLINK]:c
[hello, flink, hello, FLINK]:d
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkdistributed cacheDataset API
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.