Big Data 12 min read

Connecting Apache Kafka with Flink 1.9 – Overview, Compatibility, and Code Samples

This article explains how to use Flink 1.9's built‑in Kafka connector, covering supported versions, Maven dependencies, consumer and producer configuration in Java and Scala, checkpointing, offset handling, partition discovery, timestamps, watermarks, and provides a complete runnable example.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Connecting Apache Kafka with Flink 1.9 – Overview, Compatibility, and Code Samples

Flink 1.9 includes a set of built‑in source and sink connectors that allow streaming data to be read from or written to external systems such as Apache Kafka, Cassandra, Kinesis, Elasticsearch, Hadoop, RabbitMQ, NiFi, and Twitter. The article focuses on the Kafka connector.

Supported Connectors

Pre‑defined sources can read from files, directories, sockets, collections, and iterators, while predefined sinks can write to files, stdout, stderr, and sockets. Additional connectors are available via Apache Bahir (e.g., ActiveMQ, Flume, Redis, Akka, Netty).

Kafka Connector Overview

The Flink Kafka connector provides access to Kafka event streams. It integrates with Flink's checkpointing mechanism to guarantee exactly‑once processing semantics by tracking offsets internally rather than relying solely on Kafka consumer groups.

Version Compatibility

From Flink 1.7 onward the connector tracks the latest Kafka version at release time. Use the generic FlinkKafkaConsumer / FlinkKafkaProducer for Kafka ≥ 1.0.0; for older Kafka versions (0.8‑0.11) select the matching artifact (e.g., flink-connector-kafka-0.8_2.11).

Maven Dependency

Supported since

Consumer/Producer class

Kafka version

flink-connector-kafka-0.8_2.11

1.0.0

FlinkKafkaConsumer08 / FlinkKafkaProducer08

0.8.x

flink-connector-kafka-0.9_2.11

1.0.0

FlinkKafkaConsumer09 / FlinkKafkaProducer09

0.9.x

flink-connector-kafka-0.10_2.11

1.2.0

FlinkKafkaConsumer010 / FlinkKafkaProducer010

0.10.x

flink-connector-kafka-0.11_2.11

1.4.0

FlinkKafkaConsumer011 / FlinkKafkaProducer011

0.11.x

flink-connector-kafka_2.11

1.7.0

FlinkKafkaConsumer / FlinkKafkaProducer

>= 1.0.0

Maven Dependency Example

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

Consumer Usage (Java)

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181"); // only for Kafka 0.8
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(
    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

Consumer Usage (Scala)

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181") // only for Kafka 0.8
properties.setProperty("group.id", "test")
val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

Starting Offsets

After creating the consumer you can control where reading begins:

myConsumer.setStartFromEarliest();   // earliest record
myConsumer.setStartFromLatest();     // latest record
myConsumer.setStartFromTimestamp(1609459200000L); // specific epoch ms
myConsumer.setStartFromGroupOffsets(); // default behavior

Checkpointing

Enable checkpointing to let Flink store offsets together with state, ensuring fault‑tolerant exactly‑once semantics.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // every 5 seconds

Partition Discovery & Regular Expressions

The consumer can discover newly created Kafka partitions and also subscribe using a regex pattern.

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);
DataStream<String> stream = env.addSource(consumer);

Timestamps & Watermarks

Assign custom timestamp extractors and watermark emitters to the consumer:

myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

Producer Usage (Java)

DataStream<String> stream = ...;
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
    "localhost:9092",   // broker list
    "my-topic",          // target topic
    new SimpleStringSchema());
producer.setWriteTimestampToKafka(true);
stream.addSink(producer);

Full Example (Java)

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
        myConsumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        stream.print();
        env.execute("Flink Streaming Java API Skeleton");
    }
}

The article concludes with a reminder to like, bookmark, and share the post.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaFlinkConnectorStreamingKafkaScalacheckpointing
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.