Big Data 23 min read

Introduction to Apache Kafka and Its Integration with Apache Flink

This article provides a step‑by‑step guide on installing Apache Kafka, creating topics, producing and consuming messages via command line, and demonstrates how to integrate Kafka with Apache Flink using the Flink‑Kafka connector, custom serialization schemas, and event‑time window processing.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Introduction to Apache Kafka and Its Integration with Apache Flink

Kafka Introduction

Apache Kafka is a distributed publish‑subscribe messaging system originally developed by LinkedIn and contributed to the Apache Foundation in 2010. It is used to build real‑time data pipelines and streaming applications, offering horizontal scalability, fault tolerance, and high throughput.

Installation

Download the binary package, extract it, and start a single‑node ZooKeeper instance and a Kafka broker using the provided scripts.

curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

tar -zxf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0

bin/zookeeper-server-start.sh config/zookeeper.properties &

bin/kafka-server-start.sh config/server.properties

Create Topic

Create a topic named flink-tipic and verify its creation.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic

Created topic "flink-tipic".

bin/kafka-topics.sh --list --zookeeper localhost:2181
flink-tipic

Send and Receive Messages

Produce two test messages to the topic and consume them from the beginning.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg
>Kafka connector

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector

Flink Kafka Connector

The article then shows how to use the Flink‑Kafka connector (version 1.7.0) in a Maven project.

Maven Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

Custom Serialization Schema

Java class KafkaMsgSchema implements both DeserializationSchema and SerializationSchema for plain‑string messages.

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaMsgSchema() {
        // 默认UTF-8编码
        this(Charset.forName("UTF-8"));
    }

    public KafkaMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public String deserialize(byte[] message) {
        // 将Kafka的消息反序列化为java对象
        return new String(message, charset);
    }

    public boolean isEndOfStream(String nextElement) {
        // 流永远不结束
        return false;
    }

    public byte[] serialize(String element) {
        // 将java对象序列化为Kafka的消息
        return element.getBytes(this.charset);
    }

    public TypeInformation<String> getProducedType() {
        // 定义产生的数据Typeinfo
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }
}

Simple ETL Example

A Flink job reads from flink-topic, prepends "Flink study " to each record, and writes to flink-topic-output.

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);

        // 创建消费者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                sourceTopic,
                new KafkaMsgSchema(),
                p);
        // 读取Kafka消息
        DataStream<String> input = env.addSource(consumer);

        // 数据处理
        DataStream<String> result = input.map(new MapFunction<String, String>() {
            public String map(String s) throws Exception {
                String msg = "Flink study ".concat(s);
                System.out.println(msg);
                return msg;
            }
        });

        // 创建生产者
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<>(new KafkaMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka Example");
    }
}

Event‑Time and Watermark Example

Define KafkaWithTsMsgSchema to parse messages of the form String#Long into Tuple2<String,Long>, then assign timestamps and watermarks using KafkaAssignerWithPunctuatedWatermarks. A tumbling 1‑second window computes the maximum value and writes the result back to Kafka.

Schema for String#Long

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaWithTsMsgSchema() {
        this(Charset.forName("UTF-8"));
    }

    public KafkaWithTsMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public Tuple2<String, Long> deserialize(byte[] message) {
        String msg = new String(message, charset);
        String[] dataAndTs = msg.split("#");
        if (dataAndTs.length == 2) {
            return new Tuple2<>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
        } else {
            System.out.println("Fail due to invalid msg format.. [" + msg + "]");
            return new Tuple2<>(msg, 0L);
        }
    }

    @Override
    public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
        return false;
    }

    public byte[] serialize(Tuple2<String, Long> element) {
        return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }

    @Override
    public TypeInformation<Tuple2<String, Long>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    }
}

Punctuated Watermark Assigner

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class KafkaAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {
        // 利用提取的时间戳创建Watermark
        return new Watermark(l);
    }

    @Override
    public long extractTimestamp(Tuple2<String, Long> o, long l) {
        // 提取时间戳
        return o.f1;
    }
}

Main Program

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaWithEventTimeExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置 Event-time
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);
        // 创建消费者
        FlinkKafkaConsumer<Tuple2<String, Long>> consumer = new FlinkKafkaConsumer<>(
                sourceTopic,
                new KafkaWithTsMsgSchema(),
                p);

        // 读取Kafka消息并分配时间戳和Watermark
        TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
        DataStream<Tuple2<String, Long>> input = env
                .addSource(consumer).returns(typeInformation)
                .assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());

        // 数据处理:1秒滚动窗口取最大值
        DataStream<Tuple2<String, Long>> result = input
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
                .max(0);

        // 创建生产者
        FlinkKafkaProducer<Tuple2<String, Long>> producer = new FlinkKafkaProducer<>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<>(new KafkaWithTsMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka With Event-time Example");
    }
}

Reading Position Configuration

FlinkKafkaConsumer provides methods such as setStartFromEarliest(), setStartFromLatest(), setStartFromTimestamp(...), setStartFromGroupOffsets(), and

setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)

for fine‑grained offset control.

Topic Discovery

Consumers can subscribe to topics matching a regular expression, e.g., Pattern.compile(sourceTopic + "-[0-9]"), to automatically discover new topics.

Summary

The guide demonstrates installing Kafka, basic topic management, producing and consuming messages via command line, and integrating Kafka with Apache Flink for both simple data flow and event‑time windowed processing, including custom schemas, watermark generation, and offset handling.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaWatermarkApache FlinkStreamingMessage QueueEvent Time
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.