Using Flink Distributed Cache: Overview and Example
This article explains Flink's distributed cache feature, describes its registration and retrieval mechanisms, and provides a complete Java example that demonstrates how to register a file, access it within a RichMapFunction, and print the processed results.
Flink provides a distributed cache similar to Hadoop, allowing users to conveniently read local files in parallel functions and store them on TaskManager nodes to avoid repeated fetching.
The cache works by registering a file or directory (local or remote such as HDFS or S3) with the ExecutionEnvironment, assigning it a name, and having Flink automatically copy the file to all TaskManager nodes once during execution. Users can then retrieve the file by its name from the local file system of each TaskManager.
Example: Register a file in the ExecutionEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text", "a.txt");In a user function (a RichMapFunction), the cached file can be accessed via the RuntimeContext:
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> dataList = new ArrayList<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Retrieve the cached file
File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines) {
this.dataList.add(line);
System.err.println("Distributed cache file:" + line);
}
}
@Override
public String map(String value) throws Exception {
// Use the cached data
System.err.println("Using dataList:" + dataList + "------------" + value);
// Business logic
return dataList + ":" + value;
}
});
result.printToErr();The full source code combines the environment setup, file registration, and the RichMapFunction implementation shown above.
public class DisCacheTest {
public static void main(String[] args) throws Exception {
// Get execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Register a file (can be HDFS or local for testing)
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text", "a.txt");
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() { /* same as above */ });
result.printToErr();
}
}The program outputs each input element prefixed by the list of words read from the cached file, for example:
[hello, flink, hello, FLINK]:a
[hello, flink, hello, FLINK]:b
[hello, flink, hello, FLINK]:c
[hello, flink, hello, FLINK]:dSigned-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
