Big Data 11 min read

Implementing Exactly-Once Kafka-to-Redis with Flink: Two-Phase Commit Sink and Bug Fixes

This tutorial explains how to achieve exactly‑once semantics when streaming data from Kafka to Redis using Apache Flink's TwoPhaseCommitSinkFunction, covering Redis transaction basics, utility classes, sink implementation, testing steps, and solutions to common connection and transaction bugs.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing Exactly-Once Kafka-to-Redis with Flink: Two-Phase Commit Sink and Bug Fixes

In this tutorial we demonstrate how to achieve exactly‑once semantics when writing streaming data from Kafka to Redis using Apache Flink’s TwoPhaseCommitSinkFunction.

We first discuss Redis transaction characteristics, noting that Redis transactions are queued commands without true isolation and can be discarded.

RedisUtil provides a Jedis connection pool and methods to obtain a Transaction object.

public class RedisUtil {<br/>    public static JedisPool jedisPool = null;<br/>    public static JedisPoolConfig jedisPoolConfig;<br/>    private final transient Jedis jedis;<br/>    private transient Transaction jedisTransaction;<br/>    static {<br/>        jedisPoolConfig = new JedisPoolConfig();<br/>        jedisPoolConfig.setMaxTotal(100);<br/>        jedisPoolConfig.setBlockWhenExhausted(true);<br/>        jedisPoolConfig.setMaxWaitMillis(2000);<br/>        jedisPoolConfig.setMaxIdle(5);<br/>        jedisPoolConfig.setMinIdle(5);<br/>        jedisPoolConfig.setTestOnBorrow(false);<br/>    }<br/>    public RedisUtil() throws IOException {<br/>        InputStream in = RedisUtil.class.getClassLoader().getResourceAsStream("redis.properties");<br/>        Properties properties = new Properties();<br/>        properties.load(in);<br/>        String port = properties.getProperty("redis.port");<br/>        String timeout = properties.getProperty("redis.timeout");<br/>        jedisPool = new JedisPool(jedisPoolConfig, properties.getProperty("redis.host"), Integer.parseInt(port), Integer.parseInt(timeout));<br/>        jedis = jedisPool.getResource();<br/>        jedis.auth("root");<br/>    }<br/>    public Transaction getTransaction() {<br/>        if (this.jedisTransaction == null) {<br/>            jedisTransaction = this.jedis.multi();<br/>        }<br/>        return this.jedisTransaction;<br/>    }<br/>    public Jedis getjedis() { return this.jedis; }<br/>    public void setjedisTransactionIsNull() { this.jedisTransaction = null; }<br/>}

RedisExactlySink extends TwoPhaseCommitSinkFunction, implementing beginTransaction, preCommit, commit, and abort to handle Redis hash writes with proper transaction management.

public class RedisExactlySink<T> extends TwoPhaseCommitSinkFunction<T, RedisUtil, Void> {<br/>    public static final String REDIS_HASH_MAP = "WordAndWordCount";<br/>    public static RedisUtil redisUtil;<br/>    static {<br/>        try { redisUtil = new RedisUtil(); } catch (IOException e) { e.printStackTrace(); }<br/>    }<br/>    public RedisExactlySink() {<br/>        super(new KryoSerializer<>(RedisUtil.class, new ExecutionConfig()), VoidSerializer.INSTANCE);<br/>    }<br/>    @Override protected void invoke(RedisUtil transaction, T value, Context context) throws Exception {<br/>        Transaction jedis = transaction.getTransaction();<br/>        // reflectively extract fields and write to Redis hash<br/>        // ... (omitted for brevity) <br/>        jedis.hset(REDIS_HASH_MAP, key, valueStr);<br/>    }<br/>    @Override protected RedisUtil beginTransaction() throws Exception { return redisUtil; }<br/>    @Override protected void preCommit(RedisUtil transaction) throws Exception { System.out.println("preCommit"); }<br/>    @Override protected void commit(RedisUtil transaction) {<br/>        Transaction jedis = transaction.getTransaction();<br/>        try { jedis.exec(); transaction.setjedisTransactionIsNull(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("jedis close!!!"); }<br/>    }<br/>    @Override protected void abort(RedisUtil transaction) {<br/>        Transaction jedis = transaction.getTransaction();<br/>        try { jedis.discard(); transaction.setjedisTransactionIsNull(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("jedis close!!!"); }<br/>    }<br/>}

The main Flink job creates a streaming environment, reads from Kafka, maps each word to a (word,1) tuple, aggregates counts, converts tuples to POJOs, and adds the RedisExactlySink to write results into a Redis hash.

public static void main(String[] args) throws Exception {<br/>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br/>    env.setParallelism(1);<br/>    env.getConfig().addDefaultKryoSerializer(Jedis.class, TBaseSerializer.class);<br/>    CkAndStateBacked.setCheckPointAndStateBackend(env, "FS");<br/>    InputStream in = KafkaToRedis.class.getClassLoader().getResourceAsStream("kafka.properties");<br/>    ParameterTool pt = ParameterTool.fromPropertiesFile(in);<br/>    DataStream<String> kafkaStream = KafkaUtil.getKafkaDataStream(pt, SimpleStringSchema.class, env);<br/>    SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = kafkaStream.map(value -> new Tuple2<>(value,1));<br/>    SingleOutputStreamOperator<Tuple2<String,Integer>> reduceStream = mapStream.keyBy(t -> t.f0).reduce((v1,v2) -> new Tuple2<>(v1.f0, v1.f1+v2.f1));<br/>    SingleOutputStreamOperator<Pojo> pojoStream = reduceStream.map(t -> new Pojo(t.f0, t.f1));<br/>    pojoStream.addSink(new RedisExactlySink<Pojo>());<br/>    env.execute();<br/>}

Testing steps include starting Redis, launching a Kafka producer to generate data, and running the Flink job; screenshots in the original article illustrate each stage.

We also address three common bugs: 1) “Could not get a resource since the pool is exhausted” – resolved by setting jedisPoolConfig.setTestOnBorrow(false) or ensuring sufficient pool size. 2) “ERR EXEC without MULTI” – fixed by resetting the transaction object after each commit/abort to obtain a fresh Jedis transaction. 3) “Committing one of transactions failed…” – solved by removing unnecessary jedistransaction.close() calls in the finally block.

In summary, the article provides a complete Flink‑Exactly‑once Kafka‑to‑Redis implementation, detailed code, testing procedures, and practical solutions to typical connection and transaction issues.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkredisKafkaExactly-Oncetwo-phase commit
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.