Big Data 8 min read

Getting Started with Flink Kafka Connector: Concepts, Setup, and Sample Code

This article introduces the Flink‑Kafka connector, explains essential Kafka concepts, shows how to configure checkpointing, provides Maven dependencies, and includes complete Java examples for both producing to and consuming from Kafka within a Flink streaming job.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Getting Started with Flink Kafka Connector: Concepts, Setup, and Sample Code

This guide explains what the Flink‑Kafka connector does, how Kafka partitions and Flink parallelism work together, and how Kafka can serve as both source and sink for Flink jobs.

Kafka basics

Kafka consists of producers that publish messages, consumers that read messages, and topics that logically group messages. Topics are divided into partitions and can be replicated.

Common command‑line operations:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

Flink‑Kafka consumption settings

setStartFromGroupOffsets(): default, reads the last committed offsets.

setStartFromEarliest(): reads from the earliest offset, ignoring stored offsets.

setStartFromLatest(): reads from the latest offset.

setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>): reads from user‑specified offsets.

When checkpointing is enabled, Flink periodically saves Kafka offsets together with operator state, allowing exact‑once recovery after a failure.

Enable checkpointing in your job, e.g.:

env.enableCheckpointing(5000); // checkpoint every 5 seconds

Maven dependency for the connector

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.7.0</version>
</dependency>

Example: Kafka producer (Flink sink)

public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
        text.addSink(producer);
        env.execute();
    }
}

class MyNoParalleSource implements SourceFunction<String> {
    private volatile boolean isRunning = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            List<String> books = new ArrayList<>();
            books.add("Python从入门到放弃");
            books.add("Java从入门到放弃");
            books.add("Php从入门到放弃");
            books.add("C++从入门到放弃");
            books.add("Scala从入门到放弃");
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));
            Thread.sleep(2000);
        }
    }
    @Override
    public void cancel() { isRunning = false; }
}

Running the program continuously emits book titles to the Kafka topic "test". You can verify the data with:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Example: Kafka consumer (Flink source)

public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(consumer);
        stream.print();
        env.execute();
    }
}

Executing this job prints all messages previously sent to the "test" topic, confirming that the connector works in both directions.

Overall, the article provides a step‑by‑step walkthrough for installing Kafka, configuring Flink checkpointing, adding the Maven dependency, and running complete producer and consumer examples.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkConnectorStreaming
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.