Big Data 20 min read

Apache Flink Deployment with Pulsar Connector: Setup, Demos, and Best Practices

This guide shows how to deploy Apache Flink 1.17 in Docker, configure off‑heap memory, connect it to Pulsar via the 4.1.0‑1.17 connector, run example jobs that copy topics and perform windowed word‑count, and provides Maven dependencies, custom serialization tips, batching settings, and version‑specific best‑practice notes.

Tencent Cloud Developer
Tencent Cloud Developer
Tencent Cloud Developer
Apache Flink Deployment with Pulsar Connector: Setup, Demos, and Best Practices

Apache Flink is an open‑source stream and batch processing framework that provides high throughput, low latency, event‑time handling and state management. This guide focuses on using Flink 1.17 with the Pulsar connector.

1. Background

Flink supports Java, Scala, Python and SQL APIs and can run on clusters or in the cloud. The document tests the Pulsar plugin for Flink 1.17.

2. Deploy Flink (Docker)

2.1 Set Flink environment configuration. The off‑heap memory for TaskManager must be declared, e.g.

$ FLINK_PROPERTIES=$'\njobmanager.rpc.address: jobmanager\ntaskmanager.memory.task.off-heap.size: 1gb\ntaskmanager.memory.process.size: 4gb'

Create a Docker network:

$ docker network create flink-network

2.2 Deploy JobManager

$ docker run --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" flink:1.17.2-scala_2.12 jobmanager

2.3 Deploy TaskManager

$ docker run --rm --name=taskmanager --network flink-network --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" flink:1.17.2-scala_2.12 taskmanager

After both containers start, the Flink Dashboard (http://localhost:8081) shows the TaskManager registered.

3. Demo – Topic Copy

The demo copies all messages from one Pulsar topic to another using the Pulsar source and sink with the built‑in String serializer.

public static void main(String[] args) throws Exception {
    // parameter parsing omitted for brevity
    PulsarSource
source = PulsarSource.builder()
            .setServiceUrl(brokerServiceUrl)
            .setStartCursor(StartCursor.latest())
            .setTopics(inputTopic)
            .setDeserializationSchema(new SimpleStringSchema())
            .setSubscriptionName(subscriptionName)
            .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
            .build();
    PulsarSink
sink = PulsarSink.builder()
            .setServiceUrl(brokerServiceUrl)
            .setTopics(outputTopic)
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
            .setSerializationSchema(new SimpleStringSchema())
            .build();
    // stream processing omitted
    env.execute("Pulsar Streaming Message Duplication");
}

Run the job with:

/usr/local/services/flink/flink-1.17.2/bin/flink run /tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
    --broker-service-url http://pulsar-xxxxx \
    --input-topic .../NinjaDuplicationInput1 \
    --output-topic .../NinjaDuplicationOutput1 \
    --subscription-name ninjaTest1 \
    --token

Verify that messages appear in the output topic.

4. Demo – Word Count

This classic Flink example counts words in a time window and writes the result to a Pulsar topic using a custom JSON serializer.

public class PulsarStreamingWordCount {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarStreamingWordCount.class);
    public static void main(String[] args) throws Exception {
        ParameterTool pt = ParameterTool.fromArgs(args);
        if (pt.getNumberOfParameters() < 2) { System.err.println("Missing parameters!"); return; }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.enableCheckpointing(60000);
        env.getConfig().setGlobalJobParameters(pt);
        String broker = pt.getRequired("broker-service-url");
        String inTopic = pt.getRequired("input-topic");
        String outTopic = pt.getRequired("output-topic");
        String sub = pt.get("subscription-name", "WordCountTest");
        String token = pt.getRequired("token");
        int windowSec = pt.getInt("time-window", 60);
        PulsarSource
source = PulsarSource.builder()
                .setServiceUrl(broker)
                .setStartCursor(StartCursor.latest())
                .setTopics(inTopic)
                .setDeserializationSchema(new SimpleStringSchema())
                .setSubscriptionName(sub)
                .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
                .build();
        DataStream
stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
        DataStream
wc = stream
                .flatMap((FlatMapFunction
) (line, out) -> {
                    for (String w : line.split("\\s")) out.collect(new WordCount(w, 1, null));
                })
                .returns(WordCount.class)
                .keyBy(WordCount::getWord)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(windowSec)))
                .reduce((c1, c2) -> new WordCount(c1.getWord(), c1.getCount() + c2.getCount(), null));
        PulsarSink
sink = PulsarSink.builder()
                .setServiceUrl(broker)
                .setTopics(outTopic)
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
                .setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false)
                .setSerializationSchema(new PulsarSerializationSchema
() {
                    private ObjectMapper mapper;
                    @Override public void open(SerializationSchema.InitializationContext ctx, PulsarSinkContext sctx, SinkConfiguration cfg) { mapper = new ObjectMapper(); }
                    @Override public PulsarMessage
serialize(WordCount wc, PulsarSinkContext sctx) {
                        wc.setSinkDateTime(LocalDateTime.now().toString());
                        byte[] bytes;
                        try { bytes = mapper.writeValueAsBytes(wc); } catch (Exception e) { bytes = e.getMessage().getBytes(); }
                        return PulsarMessage.builder(bytes).build();
                    }
                })
                .build();
        wc.sinkTo(sink);
        env.execute("Pulsar Streaming WordCount");
    }
}

Submit the job similarly, then produce messages to the input topic and observe word‑count results in the output topic.

5. Flink‑Pulsar Connector Summary

5.1 Version selection – Use Flink 1.15‑1.17 for the Pulsar connector; newer versions may require adjustments.

5.2 Maven dependencies (1.17 example):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar</artifactId>
    <version>4.1.0-1.17</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.17.2</version>
</dependency>

5.3 Source example (1.17):

PulsarSource<String> source = PulsarSource.builder()
        .setServiceUrl(brokerServiceUrl)
        .setStartCursor(StartCursor.latest())
        .setTopics(inputTopic)
        .setDeserializationSchema(new SimpleStringSchema())
        .setSubscriptionName(subscriptionName)
        .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
        .build();

5.4 Sink example (1.17):

PulsarSink<String> sink = PulsarSink.builder()
        .setServiceUrl(brokerServiceUrl)
        .setTopics(outputTopic)
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
        .setSerializationSchema(new SimpleStringSchema())
        .build();

6. Important Notes

Explicitly set off‑heap memory for TaskManager (e.g., 1 GB) because the Pulsar connector uses it.

Prefer implementing PulsarSerializationSchema for custom serialization to avoid coupling with Pulsar schemas.

Disable batch sending if you need per‑message visibility: .setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false) .

The source currently cannot read message properties such as publish time; use processing‑time windows instead.

Older Flink versions (≤1.16) support shared subscription types that require transactional acknowledgments, which are not available in TDMQ Pulsar.

Oceanus’s built‑in Pulsar connector targets Flink 1.13‑1.14 and may need extensive changes for newer Flink releases.

JavaApache FlinkStreamingWordCountDataStreamDocker DeploymentPulsar Connector
Tencent Cloud Developer
Written by

Tencent Cloud Developer

Official Tencent Cloud community account that brings together developers, shares practical tech insights, and fosters an influential tech exchange community.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.