Integrating Apache Flink with Apache Pulsar for Scalable Elastic Data Processing
This article explains how Apache Pulsar and Apache Flink can be combined to provide a unified, scalable, and fault‑tolerant data processing platform, covering Pulsar's architecture, its differences from other messaging systems, various integration patterns, and concrete code examples for stream and batch workloads.
Apache Pulsar is an open‑source, multi‑tenant, high‑performance publish‑subscribe messaging system managed by the Apache Software Foundation. It offers native support for multiple clusters, geo‑replication, low latency, and seamless scaling to millions of topics, backed by Apache BookKeeper for durable storage.
The article highlights three key differences between Pulsar and other pub‑sub frameworks: (1) Pulsar couples messaging with persistent log storage, enabling instant failover and independent scalability; (2) its design is multi‑tenant from the ground up, providing tenant‑, namespace‑, and topic‑level isolation; (3) Pulsar unifies stream and queue consumption models, allowing flexible data access patterns.
Both Flink and Pulsar treat data as a continuous stream, with Flink viewing batch as a special case of bounded streams. Pulsar’s layered architecture mirrors Flink’s unified view, making it possible to use Pulsar as the data layer while Flink handles computation.
Future integration scenarios include using streaming connectors for real‑time workloads, batch source connectors for batch jobs, native schema support for structured access, and employing Pulsar as Flink’s state backend thanks to its segmented streams and BookKeeper storage.
Existing integration patterns are demonstrated with concrete code examples. The first example shows how to use PulsarSourceBuilder to create a Flink DataStream source, apply a flatMap transformation, window the data, and write results back to Pulsar:
PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())</code>
<code> .serviceUrl(serviceUrl)</code>
<code> .topic(inputTopic)</code>
<code> .subscriptionName(subscription);</code>
<code>SourceFunction<String> src = builder.build();</code>
<code>DataStream<String> input = env.addSource(src);</code>
<code>DataStream<WordWithCount> wc = input</code>
<code> .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {</code>
<code>for (String word : line.split("\\s")) {</code>
<code> collector.collect(new WordWithCount(word, 1));</code>
<code>}</code>
<code> })</code>
<code> .returns(WordWithCount.class)</code>
<code> .keyBy("word")</code>
<code> .timeWindow(Time.seconds(5))</code>
<code> .reduce((ReduceFunction<WordWithCount>) (c1, c2) -> new WordWithCount(c1.word, c1.count + c2.count));</code>
<code>if (null != outputTopic) {</code>
<code> wc.addSink(new FlinkPulsarProducer<>(serviceUrl, outputTopic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes(UTF_8), wordWithCount -> wordWithCount.word)).setParallelism(parallelism);</code>
<code>} else {</code>
<code> wc.print().setParallelism(1);</code>
<code>}A second example illustrates using Pulsar as both a streaming source and a table sink, registering the DataStream as a table and executing SQL queries:
PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())</code>
<code> .serviceUrl(serviceUrl)</code>
<code> .topic(inputTopic)</code>
<code> .subscriptionName(subscription);</code>
<code>SourceFunction<String> src = builder.build();</code>
<code>DataStream<String> input = env.addSource(src);</code>
<code>DataStream<WordWithCount> wc = input.flatMap(...).returns(WordWithCount.class).keyBy(ROUTING_KEY).timeWindow(Time.seconds(5)).reduce(...);</code>
<code>tableEnvironment.registerDataStream("wc", wc);</code>
<code>Table table = tableEnvironment.sqlQuery("select word, `count` from wc");</code>
<code>TableSink sink = null;</code>
<code>if (null != outputTopic) {</code>
<code> sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);</code>
<code>} else {</code>
<code> sink = new CsvTableSink("./examples/file", "|");</code>
<code>}</code>
<code>table.writeToSink(sink);A batch‑oriented integration shows Pulsar used as a batch output format, reading a static dataset, performing word count, and writing the aggregated results back to Pulsar:
final OutputFormat pulsarOutputFormat = new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());</code>
<code>DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);</code>
<code>DataSet<WordWithCount> wc = textDS.flatMap(new FlatMapFunction<String, WordWithCount>() { ... })</code>
<code>.filter(wordWithCount -> wordWithCount.word.length() > 4)</code>
<code>.groupBy(new KeySelector<WordWithCount, String>() { ... })</code>
<code>.reduce(new ReduceFunction<WordWithCount>() { ... })</code>
<code>.output(pulsarOutputFormat);In summary, Pulsar and Flink share a unified view of data where batch is treated as a special case of streaming. By leveraging Pulsar’s segmented streams and Flink’s ability to handle both stream and batch workloads, developers can build large‑scale, elastic data processing pipelines that combine messaging, storage, and computation in a single framework.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
