Implementing Exactly-Once Kafka-to-Redis with Flink: Two-Phase Commit Sink and Bug Fixes
This tutorial explains how to achieve exactly‑once semantics when streaming data from Kafka to Redis using Apache Flink's TwoPhaseCommitSinkFunction, covering Redis transaction basics, utility classes, sink implementation, testing steps, and solutions to common connection and transaction bugs.
In this tutorial we demonstrate how to achieve exactly‑once semantics when writing streaming data from Kafka to Redis using Apache Flink’s TwoPhaseCommitSinkFunction.
We first discuss Redis transaction characteristics, noting that Redis transactions are queued commands without true isolation and can be discarded.
RedisUtil provides a Jedis connection pool and methods to obtain a Transaction object.
public class RedisUtil {<br/> public static JedisPool jedisPool = null;<br/> public static JedisPoolConfig jedisPoolConfig;<br/> private final transient Jedis jedis;<br/> private transient Transaction jedisTransaction;<br/> static {<br/> jedisPoolConfig = new JedisPoolConfig();<br/> jedisPoolConfig.setMaxTotal(100);<br/> jedisPoolConfig.setBlockWhenExhausted(true);<br/> jedisPoolConfig.setMaxWaitMillis(2000);<br/> jedisPoolConfig.setMaxIdle(5);<br/> jedisPoolConfig.setMinIdle(5);<br/> jedisPoolConfig.setTestOnBorrow(false);<br/> }<br/> public RedisUtil() throws IOException {<br/> InputStream in = RedisUtil.class.getClassLoader().getResourceAsStream("redis.properties");<br/> Properties properties = new Properties();<br/> properties.load(in);<br/> String port = properties.getProperty("redis.port");<br/> String timeout = properties.getProperty("redis.timeout");<br/> jedisPool = new JedisPool(jedisPoolConfig, properties.getProperty("redis.host"), Integer.parseInt(port), Integer.parseInt(timeout));<br/> jedis = jedisPool.getResource();<br/> jedis.auth("root");<br/> }<br/> public Transaction getTransaction() {<br/> if (this.jedisTransaction == null) {<br/> jedisTransaction = this.jedis.multi();<br/> }<br/> return this.jedisTransaction;<br/> }<br/> public Jedis getjedis() { return this.jedis; }<br/> public void setjedisTransactionIsNull() { this.jedisTransaction = null; }<br/>}RedisExactlySink extends TwoPhaseCommitSinkFunction, implementing beginTransaction, preCommit, commit, and abort to handle Redis hash writes with proper transaction management.
public class RedisExactlySink<T> extends TwoPhaseCommitSinkFunction<T, RedisUtil, Void> {<br/> public static final String REDIS_HASH_MAP = "WordAndWordCount";<br/> public static RedisUtil redisUtil;<br/> static {<br/> try { redisUtil = new RedisUtil(); } catch (IOException e) { e.printStackTrace(); }<br/> }<br/> public RedisExactlySink() {<br/> super(new KryoSerializer<>(RedisUtil.class, new ExecutionConfig()), VoidSerializer.INSTANCE);<br/> }<br/> @Override protected void invoke(RedisUtil transaction, T value, Context context) throws Exception {<br/> Transaction jedis = transaction.getTransaction();<br/> // reflectively extract fields and write to Redis hash<br/> // ... (omitted for brevity) <br/> jedis.hset(REDIS_HASH_MAP, key, valueStr);<br/> }<br/> @Override protected RedisUtil beginTransaction() throws Exception { return redisUtil; }<br/> @Override protected void preCommit(RedisUtil transaction) throws Exception { System.out.println("preCommit"); }<br/> @Override protected void commit(RedisUtil transaction) {<br/> Transaction jedis = transaction.getTransaction();<br/> try { jedis.exec(); transaction.setjedisTransactionIsNull(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("jedis close!!!"); }<br/> }<br/> @Override protected void abort(RedisUtil transaction) {<br/> Transaction jedis = transaction.getTransaction();<br/> try { jedis.discard(); transaction.setjedisTransactionIsNull(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("jedis close!!!"); }<br/> }<br/>}The main Flink job creates a streaming environment, reads from Kafka, maps each word to a (word,1) tuple, aggregates counts, converts tuples to POJOs, and adds the RedisExactlySink to write results into a Redis hash.
public static void main(String[] args) throws Exception {<br/> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br/> env.setParallelism(1);<br/> env.getConfig().addDefaultKryoSerializer(Jedis.class, TBaseSerializer.class);<br/> CkAndStateBacked.setCheckPointAndStateBackend(env, "FS");<br/> InputStream in = KafkaToRedis.class.getClassLoader().getResourceAsStream("kafka.properties");<br/> ParameterTool pt = ParameterTool.fromPropertiesFile(in);<br/> DataStream<String> kafkaStream = KafkaUtil.getKafkaDataStream(pt, SimpleStringSchema.class, env);<br/> SingleOutputStreamOperator<Tuple2<String,Integer>> mapStream = kafkaStream.map(value -> new Tuple2<>(value,1));<br/> SingleOutputStreamOperator<Tuple2<String,Integer>> reduceStream = mapStream.keyBy(t -> t.f0).reduce((v1,v2) -> new Tuple2<>(v1.f0, v1.f1+v2.f1));<br/> SingleOutputStreamOperator<Pojo> pojoStream = reduceStream.map(t -> new Pojo(t.f0, t.f1));<br/> pojoStream.addSink(new RedisExactlySink<Pojo>());<br/> env.execute();<br/>}Testing steps include starting Redis, launching a Kafka producer to generate data, and running the Flink job; screenshots in the original article illustrate each stage.
We also address three common bugs: 1) “Could not get a resource since the pool is exhausted” – resolved by setting jedisPoolConfig.setTestOnBorrow(false) or ensuring sufficient pool size. 2) “ERR EXEC without MULTI” – fixed by resetting the transaction object after each commit/abort to obtain a fresh Jedis transaction. 3) “Committing one of transactions failed…” – solved by removing unnecessary jedistransaction.close() calls in the finally block.
In summary, the article provides a complete Flink‑Exactly‑once Kafka‑to‑Redis implementation, detailed code, testing procedures, and practical solutions to typical connection and transaction issues.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
