Big Data 5 min read

Reading Kafka Topics with Flink: A Step‑by‑Step Guide

This tutorial demonstrates how to use Apache Flink's Kafka connector to read data from Kafka topics with exactly‑once semantics, covering Maven dependencies, consumer configuration, checkpointing for fault tolerance, and a complete Scala example that writes the streamed data to HDFS.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Reading Kafka Topics with Flink: A Step‑by‑Step Guide

This article explains how to read data from Kafka topics using Apache Flink, leveraging Flink's built‑in Kafka connector which integrates with Flink's checkpoint mechanism to provide exactly‑once processing semantics.

To use the connector, add the following Maven dependency (Flink 1.0.0, Scala 2.10) to your pom.xml:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.0.0</version>
</dependency>

The FlinkKafkaConsumer08 class reads one or more Kafka topics. Its constructor requires the topic name(s), a DeserializationSchema (e.g., SimpleStringSchema) or a KeyedDeserializationSchema, and a set of properties such as bootstrap.servers, zookeeper.connect (for Kafka 0.8), and group.id:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "www.iteblog.com:9092")
properties.setProperty("zookeeper.connect", "www.iteblog.com:2181")
properties.setProperty("group.id", "iteblog")

val stream = env.addSource(
  new FlinkKafkaConsumer08[String]("iteblog", new SimpleStringSchema(), properties)
)
stream.print()

For fault‑tolerant execution, enable checkpointing (e.g., every 5000 ms) before creating the consumer:

env.enableCheckpointing(5000) // checkpoint every 5000 ms

When checkpointing is active, Flink stores Kafka offsets together with its state, allowing the job to recover from the latest checkpoint after a failure. Without checkpointing, the consumer periodically commits offsets to Zookeeper.

A full runnable example sets the parallelism, writes the consumed data to HDFS, and starts the job:

stream.setParallelism(4).writeAsText("hdfs:///tmp/iteblog/data")
env.execute("IteblogFlinkKafkaStreaming")

Running this program creates a data file under hdfs:///tmp/iteblog/ containing the messages read from the specified Kafka topic.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkScalacheckpointingKafkaConnector
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.