Getting Started with Flink Kafka Connector: Concepts, Setup, and Sample Code
This article introduces the Flink‑Kafka connector, explains essential Kafka concepts, shows how to configure checkpointing, provides Maven dependencies, and includes complete Java examples for both producing to and consuming from Kafka within a Flink streaming job.
This guide explains what the Flink‑Kafka connector does, how Kafka partitions and Flink parallelism work together, and how Kafka can serve as both source and sink for Flink jobs.
Kafka basics
Kafka consists of producers that publish messages, consumers that read messages, and topics that logically group messages. Topics are divided into partitions and can be replicated.
Common command‑line operations:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic testFlink‑Kafka consumption settings
setStartFromGroupOffsets(): default, reads the last committed offsets.
setStartFromEarliest(): reads from the earliest offset, ignoring stored offsets.
setStartFromLatest(): reads from the latest offset.
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>): reads from user‑specified offsets.
When checkpointing is enabled, Flink periodically saves Kafka offsets together with operator state, allowing exact‑once recovery after a failure.
Enable checkpointing in your job, e.g.:
env.enableCheckpointing(5000); // checkpoint every 5 secondsMaven dependency for the connector
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>Example: Kafka producer (Flink sink)
public class KafkaProducer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
text.addSink(producer);
env.execute();
}
}
class MyNoParalleSource implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
List<String> books = new ArrayList<>();
books.add("Python从入门到放弃");
books.add("Java从入门到放弃");
books.add("Php从入门到放弃");
books.add("C++从入门到放弃");
books.add("Scala从入门到放弃");
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
Thread.sleep(2000);
}
}
@Override
public void cancel() { isRunning = false; }
}Running the program continuously emits book titles to the Kafka topic "test". You can verify the data with:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningExample: Kafka consumer (Flink source)
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
stream.print();
env.execute();
}
}Executing this job prints all messages previously sent to the "test" topic, confirming that the connector works in both directions.
Overall, the article provides a step‑by‑step walkthrough for installing Kafka, configuring Flink checkpointing, adding the Maven dependency, and running complete producer and consumer examples.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
