Apache Flink Deployment with Pulsar Connector: Setup, Demos, and Best Practices
This guide shows how to deploy Apache Flink 1.17 in Docker, configure off‑heap memory, connect it to Pulsar via the 4.1.0‑1.17 connector, run example jobs that copy topics and perform windowed word‑count, and provides Maven dependencies, custom serialization tips, batching settings, and version‑specific best‑practice notes.
Apache Flink is an open‑source stream and batch processing framework that provides high throughput, low latency, event‑time handling and state management. This guide focuses on using Flink 1.17 with the Pulsar connector.
1. Background
Flink supports Java, Scala, Python and SQL APIs and can run on clusters or in the cloud. The document tests the Pulsar plugin for Flink 1.17.
2. Deploy Flink (Docker)
2.1 Set Flink environment configuration. The off‑heap memory for TaskManager must be declared, e.g.
$ FLINK_PROPERTIES=$'\njobmanager.rpc.address: jobmanager\ntaskmanager.memory.task.off-heap.size: 1gb\ntaskmanager.memory.process.size: 4gb'Create a Docker network:
$ docker network create flink-network2.2 Deploy JobManager
$ docker run --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" flink:1.17.2-scala_2.12 jobmanager2.3 Deploy TaskManager
$ docker run --rm --name=taskmanager --network flink-network --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" flink:1.17.2-scala_2.12 taskmanagerAfter both containers start, the Flink Dashboard (http://localhost:8081) shows the TaskManager registered.
3. Demo – Topic Copy
The demo copies all messages from one Pulsar topic to another using the Pulsar source and sink with the built‑in String serializer.
public static void main(String[] args) throws Exception {
// parameter parsing omitted for brevity
PulsarSource
source = PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName(subscriptionName)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.build();
PulsarSink
sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setSerializationSchema(new SimpleStringSchema())
.build();
// stream processing omitted
env.execute("Pulsar Streaming Message Duplication");
}Run the job with:
/usr/local/services/flink/flink-1.17.2/bin/flink run /tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
--broker-service-url http://pulsar-xxxxx \
--input-topic .../NinjaDuplicationInput1 \
--output-topic .../NinjaDuplicationOutput1 \
--subscription-name ninjaTest1 \
--tokenVerify that messages appear in the output topic.
4. Demo – Word Count
This classic Flink example counts words in a time window and writes the result to a Pulsar topic using a custom JSON serializer.
public class PulsarStreamingWordCount {
private static final Logger LOG = LoggerFactory.getLogger(PulsarStreamingWordCount.class);
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
if (pt.getNumberOfParameters() < 2) { System.err.println("Missing parameters!"); return; }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(60000);
env.getConfig().setGlobalJobParameters(pt);
String broker = pt.getRequired("broker-service-url");
String inTopic = pt.getRequired("input-topic");
String outTopic = pt.getRequired("output-topic");
String sub = pt.get("subscription-name", "WordCountTest");
String token = pt.getRequired("token");
int windowSec = pt.getInt("time-window", 60);
PulsarSource
source = PulsarSource.builder()
.setServiceUrl(broker)
.setStartCursor(StartCursor.latest())
.setTopics(inTopic)
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName(sub)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.build();
DataStream
stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
DataStream
wc = stream
.flatMap((FlatMapFunction
) (line, out) -> {
for (String w : line.split("\\s")) out.collect(new WordCount(w, 1, null));
})
.returns(WordCount.class)
.keyBy(WordCount::getWord)
.window(TumblingProcessingTimeWindows.of(Time.seconds(windowSec)))
.reduce((c1, c2) -> new WordCount(c1.getWord(), c1.getCount() + c2.getCount(), null));
PulsarSink
sink = PulsarSink.builder()
.setServiceUrl(broker)
.setTopics(outTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false)
.setSerializationSchema(new PulsarSerializationSchema
() {
private ObjectMapper mapper;
@Override public void open(SerializationSchema.InitializationContext ctx, PulsarSinkContext sctx, SinkConfiguration cfg) { mapper = new ObjectMapper(); }
@Override public PulsarMessage
serialize(WordCount wc, PulsarSinkContext sctx) {
wc.setSinkDateTime(LocalDateTime.now().toString());
byte[] bytes;
try { bytes = mapper.writeValueAsBytes(wc); } catch (Exception e) { bytes = e.getMessage().getBytes(); }
return PulsarMessage.builder(bytes).build();
}
})
.build();
wc.sinkTo(sink);
env.execute("Pulsar Streaming WordCount");
}
}Submit the job similarly, then produce messages to the input topic and observe word‑count results in the output topic.
5. Flink‑Pulsar Connector Summary
5.1 Version selection – Use Flink 1.15‑1.17 for the Pulsar connector; newer versions may require adjustments.
5.2 Maven dependencies (1.17 example):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>4.1.0-1.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
</dependency>5.3 Source example (1.17):
PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName(subscriptionName)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.build();5.4 Sink example (1.17):
PulsarSink<String> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setSerializationSchema(new SimpleStringSchema())
.build();6. Important Notes
Explicitly set off‑heap memory for TaskManager (e.g., 1 GB) because the Pulsar connector uses it.
Prefer implementing PulsarSerializationSchema for custom serialization to avoid coupling with Pulsar schemas.
Disable batch sending if you need per‑message visibility: .setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false) .
The source currently cannot read message properties such as publish time; use processing‑time windows instead.
Older Flink versions (≤1.16) support shared subscription types that require transactional acknowledgments, which are not available in TDMQ Pulsar.
Oceanus’s built‑in Pulsar connector targets Flink 1.13‑1.14 and may need extensive changes for newer Flink releases.
Tencent Cloud Developer
Official Tencent Cloud community account that brings together developers, shares practical tech insights, and fosters an influential tech exchange community.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.