Introduction to Apache Kafka and Its Integration with Apache Flink
This article provides a step‑by‑step guide on installing Apache Kafka, creating topics, producing and consuming messages via command line, and demonstrates how to integrate Kafka with Apache Flink using the Flink‑Kafka connector, custom serialization schemas, and event‑time window processing.
Kafka Introduction
Apache Kafka is a distributed publish‑subscribe messaging system originally developed by LinkedIn and contributed to the Apache Foundation in 2010. It is used to build real‑time data pipelines and streaming applications, offering horizontal scalability, fault tolerance, and high throughput.
Installation
Download the binary package, extract it, and start a single‑node ZooKeeper instance and a Kafka broker using the provided scripts.
curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -zxf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.propertiesCreate Topic
Create a topic named flink-tipic and verify its creation.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic
Created topic "flink-tipic".
bin/kafka-topics.sh --list --zookeeper localhost:2181
flink-tipicSend and Receive Messages
Produce two test messages to the topic and consume them from the beginning.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg
>Kafka connector
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connectorFlink Kafka Connector
The article then shows how to use the Flink‑Kafka connector (version 1.7.0) in a Maven project.
Maven Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>Custom Serialization Schema
Java class KafkaMsgSchema implements both DeserializationSchema and SerializationSchema for plain‑string messages.
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;
private transient Charset charset;
public KafkaMsgSchema() {
// 默认UTF-8编码
this(Charset.forName("UTF-8"));
}
public KafkaMsgSchema(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}
public Charset getCharset() {
return this.charset;
}
public String deserialize(byte[] message) {
// 将Kafka的消息反序列化为java对象
return new String(message, charset);
}
public boolean isEndOfStream(String nextElement) {
// 流永远不结束
return false;
}
public byte[] serialize(String element) {
// 将java对象序列化为Kafka的消息
return element.getBytes(this.charset);
}
public TypeInformation<String> getProducedType() {
// 定义产生的数据Typeinfo
return BasicTypeInfo.STRING_TYPE_INFO;
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(this.charset.name());
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
}Simple ETL Example
A Flink job reads from flink-topic, prepends "Flink study " to each record, and writes to flink-topic-output.
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import java.util.Properties;
public class KafkaExample {
public static void main(String[] args) throws Exception {
// 用户参数获取
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// Stream 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Source的topic
String sourceTopic = "flink-topic";
// Sink的topic
String sinkTopic = "flink-topic-output";
// broker 地址
String broker = "localhost:9092";
// 属性参数 - 实际投产可以在命令行传入
Properties p = parameterTool.getProperties();
p.putAll(parameterTool.getProperties());
p.put("bootstrap.servers", broker);
env.getConfig().setGlobalJobParameters(parameterTool);
// 创建消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
sourceTopic,
new KafkaMsgSchema(),
p);
// 读取Kafka消息
DataStream<String> input = env.addSource(consumer);
// 数据处理
DataStream<String> result = input.map(new MapFunction<String, String>() {
public String map(String s) throws Exception {
String msg = "Flink study ".concat(s);
System.out.println(msg);
return msg;
}
});
// 创建生产者
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
sinkTopic,
new KeyedSerializationSchemaWrapper<>(new KafkaMsgSchema()),
p,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
// 将数据写入Kafka指定Topic中
result.addSink(producer);
// 执行job
env.execute("Kafka Example");
}
}Event‑Time and Watermark Example
Define KafkaWithTsMsgSchema to parse messages of the form String#Long into Tuple2<String,Long>, then assign timestamps and watermarks using KafkaAssignerWithPunctuatedWatermarks. A tumbling 1‑second window computes the maximum value and writes the result back to Kafka.
Schema for String#Long
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private transient Charset charset;
public KafkaWithTsMsgSchema() {
this(Charset.forName("UTF-8"));
}
public KafkaWithTsMsgSchema(Charset charset) {
this.charset = Preconditions.checkNotNull(charset);
}
public Charset getCharset() {
return this.charset;
}
public Tuple2<String, Long> deserialize(byte[] message) {
String msg = new String(message, charset);
String[] dataAndTs = msg.split("#");
if (dataAndTs.length == 2) {
return new Tuple2<>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
} else {
System.out.println("Fail due to invalid msg format.. [" + msg + "]");
return new Tuple2<>(msg, 0L);
}
}
@Override
public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
return false;
}
public byte[] serialize(Tuple2<String, Long> element) {
return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(this.charset.name());
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
@Override
public TypeInformation<Tuple2<String, Long>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
}
}Punctuated Watermark Assigner
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class KafkaAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {
// 利用提取的时间戳创建Watermark
return new Watermark(l);
}
@Override
public long extractTimestamp(Tuple2<String, Long> o, long l) {
// 提取时间戳
return o.f1;
}
}Main Program
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import java.util.Properties;
public class KafkaWithEventTimeExample {
public static void main(String[] args) throws Exception {
// 用户参数获取
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// Stream 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 Event-time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Source的topic
String sourceTopic = "flink-topic";
// Sink的topic
String sinkTopic = "flink-topic-output";
// broker 地址
String broker = "localhost:9092";
// 属性参数 - 实际投产可以在命令行传入
Properties p = parameterTool.getProperties();
p.putAll(parameterTool.getProperties());
p.put("bootstrap.servers", broker);
env.getConfig().setGlobalJobParameters(parameterTool);
// 创建消费者
FlinkKafkaConsumer<Tuple2<String, Long>> consumer = new FlinkKafkaConsumer<>(
sourceTopic,
new KafkaWithTsMsgSchema(),
p);
// 读取Kafka消息并分配时间戳和Watermark
TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
DataStream<Tuple2<String, Long>> input = env
.addSource(consumer).returns(typeInformation)
.assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());
// 数据处理:1秒滚动窗口取最大值
DataStream<Tuple2<String, Long>> result = input
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.max(0);
// 创建生产者
FlinkKafkaProducer<Tuple2<String, Long>> producer = new FlinkKafkaProducer<>(
sinkTopic,
new KeyedSerializationSchemaWrapper<>(new KafkaWithTsMsgSchema()),
p,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
// 将数据写入Kafka指定Topic中
result.addSink(producer);
// 执行job
env.execute("Kafka With Event-time Example");
}
}Reading Position Configuration
FlinkKafkaConsumer provides methods such as setStartFromEarliest(), setStartFromLatest(), setStartFromTimestamp(...), setStartFromGroupOffsets(), and
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)for fine‑grained offset control.
Topic Discovery
Consumers can subscribe to topics matching a regular expression, e.g., Pattern.compile(sourceTopic + "-[0-9]"), to automatically discover new topics.
Summary
The guide demonstrates installing Kafka, basic topic management, producing and consuming messages via command line, and integrating Kafka with Apache Flink for both simple data flow and event‑time windowed processing, including custom schemas, watermark generation, and offset handling.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
