Big Data 4 min read

Using Flink Redis Sink for Streaming WordCount from Kafka to Redis

This tutorial demonstrates how to integrate Apache Flink with Redis as a sink, showing the Maven dependency, a custom RedisMapper implementation, and a complete Flink job that reads Kafka messages, performs word count, and stores results in Redis, with plans for HBase and MySQL extensions.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Flink Redis Sink for Streaming WordCount from Kafka to Redis

In streaming computing, a common scenario is consuming data from Kafka, processing it, and storing results in another system such as Redis.

This article presents a simple Redis‑as‑Sink example for Apache Flink.

First, add the Maven dependency for the Flink Redis connector:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.10</artifactId>
    <version>1.1.5</version>
</dependency>

Next, implement a custom RedisMapper (RedisSinkExample) that specifies the SET command and extracts the key and value from a Tuple2<String, Integer>:

public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.SET, null);
    }
    public String getKeyFromData(Tuple2<String, Integer> data) {
        return data.f0;
    }
    public String getValueFromData(Tuple2<String, Integer> data) {
        return data.f1.toString();
    }
}

A complete Flink job (RedisSinkTest) reads text from a Kafka topic, splits lines into words, counts occurrences, and writes the counts to Redis using the sink defined above:

public class RedisSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(2000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // Kafka source
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(consumer);

        DataStream<Tuple2<String, Integer>> counts = stream
            .flatMap(new LineSplitter())
            .keyBy(0)
            .sum(1);

        // Redis configuration
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
            .setHost("127.0.0.1")
            .setPort("6379")
            .build();

        // Write results to Redis
        counts.addSink(new RedisSink<>(conf, new RedisSinkExample()));
        env.execute("WordCount From Kafka To Redis");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

The article concludes by mentioning future updates that will include examples for writing to HBase and MySQL.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkredisStreaming
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.