Using Flink Redis Sink for Streaming WordCount from Kafka to Redis
This tutorial demonstrates how to integrate Apache Flink with Redis as a sink, showing the Maven dependency, a custom RedisMapper implementation, and a complete Flink job that reads Kafka messages, performs word count, and stores results in Redis, with plans for HBase and MySQL extensions.
In streaming computing, a common scenario is consuming data from Kafka, processing it, and storing results in another system such as Redis.
This article presents a simple Redis‑as‑Sink example for Apache Flink.
First, add the Maven dependency for the Flink Redis connector:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5</version>
</dependency>Next, implement a custom RedisMapper (RedisSinkExample) that specifies the SET command and extracts the key and value from a Tuple2<String, Integer>:
public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET, null);
}
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}A complete Flink job (RedisSinkTest) reads text from a Kafka topic, splits lines into words, counts occurrences, and writes the counts to Redis using the sink defined above:
public class RedisSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Kafka source
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
DataStream<Tuple2<String, Integer>> counts = stream
.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
// Redis configuration
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost("127.0.0.1")
.setPort("6379")
.build();
// Write results to Redis
counts.addSink(new RedisSink<>(conf, new RedisSinkExample()));
env.execute("WordCount From Kafka To Redis");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}The article concludes by mentioning future updates that will include examples for writing to HBase and MySQL.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
