Connecting Apache Kafka with Flink 1.9 – Overview, Compatibility, and Code Samples
This article explains how to use Flink 1.9's built‑in Kafka connector, covering supported versions, Maven dependencies, consumer and producer configuration in Java and Scala, checkpointing, offset handling, partition discovery, timestamps, watermarks, and provides a complete runnable example.
Flink 1.9 includes a set of built‑in source and sink connectors that allow streaming data to be read from or written to external systems such as Apache Kafka, Cassandra, Kinesis, Elasticsearch, Hadoop, RabbitMQ, NiFi, and Twitter. The article focuses on the Kafka connector.
Supported Connectors
Pre‑defined sources can read from files, directories, sockets, collections, and iterators, while predefined sinks can write to files, stdout, stderr, and sockets. Additional connectors are available via Apache Bahir (e.g., ActiveMQ, Flume, Redis, Akka, Netty).
Kafka Connector Overview
The Flink Kafka connector provides access to Kafka event streams. It integrates with Flink's checkpointing mechanism to guarantee exactly‑once processing semantics by tracking offsets internally rather than relying solely on Kafka consumer groups.
Version Compatibility
From Flink 1.7 onward the connector tracks the latest Kafka version at release time. Use the generic FlinkKafkaConsumer / FlinkKafkaProducer for Kafka ≥ 1.0.0; for older Kafka versions (0.8‑0.11) select the matching artifact (e.g., flink-connector-kafka-0.8_2.11).
Maven Dependency
Supported since
Consumer/Producer class
Kafka version
flink-connector-kafka-0.8_2.11
1.0.0
FlinkKafkaConsumer08 / FlinkKafkaProducer08
0.8.x
flink-connector-kafka-0.9_2.11
1.0.0
FlinkKafkaConsumer09 / FlinkKafkaProducer09
0.9.x
flink-connector-kafka-0.10_2.11
1.2.0
FlinkKafkaConsumer010 / FlinkKafkaProducer010
0.10.x
flink-connector-kafka-0.11_2.11
1.4.0
FlinkKafkaConsumer011 / FlinkKafkaProducer011
0.11.x
flink-connector-kafka_2.11
1.7.0
FlinkKafkaConsumer / FlinkKafkaProducer
>= 1.0.0
Maven Dependency Example
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>Consumer Usage (Java)
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181"); // only for Kafka 0.8
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));Consumer Usage (Scala)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181") // only for Kafka 0.8
properties.setProperty("group.id", "test")
val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()Starting Offsets
After creating the consumer you can control where reading begins:
myConsumer.setStartFromEarliest(); // earliest record
myConsumer.setStartFromLatest(); // latest record
myConsumer.setStartFromTimestamp(1609459200000L); // specific epoch ms
myConsumer.setStartFromGroupOffsets(); // default behaviorCheckpointing
Enable checkpointing to let Flink store offsets together with state, ensuring fault‑tolerant exactly‑once semantics.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // every 5 secondsPartition Discovery & Regular Expressions
The consumer can discover newly created Kafka partitions and also subscribe using a regex pattern.
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(consumer);Timestamps & Watermarks
Assign custom timestamp extractors and watermark emitters to the consumer:
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());Producer Usage (Java)
DataStream<String> stream = ...;
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema());
producer.setWriteTimestampToKafka(true);
stream.addSink(producer);Full Example (Java)
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(myConsumer);
env.enableCheckpointing(5000);
stream.print();
env.execute("Flink Streaming Java API Skeleton");
}
}The article concludes with a reminder to like, bookmark, and share the post.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
