Big Data 11 min read

Implementing Exactly-Once Semantics with Flink and Kafka: Utility Classes, Character Count Example, and Transactional Consumer

This article demonstrates how to achieve exactly‑once processing in Flink by providing Kafka I/O utility classes, a character‑count streaming example, and a transactional consumer implementation, while also discussing configuration nuances and common pitfalls.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing Exactly-Once Semantics with Flink and Kafka: Utility Classes, Character Count Example, and Transactional Consumer

The article begins with a brief recommendation for a comprehensive big‑data interview guide and a related Flink‑Kafka exactly‑once practice article.

Article Outline 1. Kafka input/output utility class 2. Character count streaming example 3. Consumer handling of transactional Kafka data 4. Summary and potential issues

1. Kafka Input/Output Utility Class

Provides reusable methods to create a FlinkKafkaConsumer and FlinkKafkaProducer with exactly‑once semantics.

//获取kafkaStream流
public static <T> DataStream<T> getKafkaDataStream(ParameterTool parameterTool, Class<? extends DeserializationSchema> clazz, StreamExecutionEnvironment env) throws IllegalAccessException, InstantiationException {
    env.getConfig().setGlobalJobParameters(parameterTool);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
    properties.setProperty("group.id", parameterTool.get("group.idsource"));
    properties.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset"));
    properties.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", String.valueOf(false)));
    String topics = parameterTool.get("Consumertopics");
    DeserializationSchema<T> deserializationSchema = clazz.newInstance();
    FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);
    flinkKafkaConsumer.setStartFromEarliest();
    flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
    return env.addSource(flinkKafkaConsumer);
}

//获取kafka生产者通用方法
public static <T> FlinkKafkaProducer<T> getFlinkKafkaProducer(ParameterTool parameterTool, KafkaSerializationSchema<T> kafkaSerializationSchema) {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
    properties.setProperty("group.id", parameterTool.get("group.idsink"));
    properties.setProperty("transaction.timeout.ms", parameterTool.get("transaction.timeout.ms"));
    properties.setProperty("client.id", "flinkOutputTopicClient");
    String topics = parameterTool.get("Producetopice");
    return new FlinkKafkaProducer<>(topics, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}

Key points for consumers

Disable automatic offset commit and bind offsets to Flink checkpoints via flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true).

Configure bootstrap.servers and start consumption from the earliest offset.

Key points for producers

Set transaction.timeout.ms lower than the broker's default 15 minutes.

Use FlinkKafkaProducer.Semantic.EXACTLY_ONCE to guarantee exactly‑once delivery.

2. Character Count Streaming Example

A simple Flink job that reads from Kafka, counts occurrences of each string, and writes the result back to Kafka.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    CkAndStateBacked.setCheckPointAndStateBackend(env, "FS");
    InputStream kafkaPropertiesStream = KafkaToKafkaExacitly.class.getClassLoader().getResourceAsStream("kafka.properties");
    ParameterTool parameterTool = ParameterTool.fromPropertiesFile(kafkaPropertiesStream);
    env.getConfig().setGlobalJobParameters(parameterTool);
    SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
    Class<? extends SimpleStringSchema> stringSchemaClass = simpleStringSchema.getClass();
    DataStream<String> kafkaDataStream = KafkaUtil.getKafkaDataStream(parameterTool, stringSchemaClass, env);
    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = kafkaDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String value) throws Exception {
            if ("error".equals(value)) {
                throw new RuntimeException("发生异常!!!");
            }
            return new Tuple2<>(value, 1);
        }
    });
    SingleOutputStreamOperator<Tuple2<String, Integer>> reduceStream = tupleStream.keyBy(value -> value.f0)
        .reduce((v1, v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1));
    FlinkKafkaProducer<Tuple2<String, Integer>> flinkKafkaProducer = KafkaUtil.getFlinkKafkaProducer(parameterTool, new KafkaSerializationSchema<Tuple2<String, Integer>>() {
        @Override
        public void open(SerializationSchema.InitializationContext context) throws Exception {
            System.out.println("=========正在向KafkaProduce输出数据!!!=============");
        }
        @Override
        public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> element, @Nullable Long timestamp) {
            String producetopics = parameterTool.get("Producetopice");
            String result = element.toString();
            return new ProducerRecord<>(producetopics, result.getBytes(StandardCharsets.UTF_8));
        }
    });
    reduceStream.addSink(flinkKafkaProducer).name("kafkasinktest").uid("kafkasink");
    env.execute("KafkaToKafkaTest");
}

Note: The job uses a local filesystem state backend; ensure the path uses the appropriate scheme (e.g., hdfs:// or file://).

3. Transactional Consumer Example

Shows how to configure a FlinkKafkaConsumer to read only committed transactional records.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Properties sourceProperties = new Properties();
    sourceProperties.setProperty("bootstrap.servers", "*****");
    sourceProperties.setProperty("group.id", "****");
    sourceProperties.put("isolation.level", "read_committed");
    FlinkKafkaConsumer<String> consumerKafka = new FlinkKafkaConsumer<>("*****", new SimpleStringSchema(), sourceProperties);
    consumerKafka.setStartFromEarliest();
    DataStreamSource<String> dataStreamSource = env.addSource(consumerKafka);
    dataStreamSource.print();
    env.execute();
}

Setting isolation.level=read_committed ensures the consumer sees only data from successful transactions.

4. Summary and Potential Issues

The examples illustrate Flink‑Kafka exactly‑once pipelines, but proper Kafka configuration is essential. Critical parameters include offsets.topic.replication.factor, transaction.state.log.replication.factor, and transaction.state.log.min.isr. The replication factor for the transaction log must be at least the minimum ISR; otherwise, checkpoints may time out and the job can fail.

When a Flink job crashes, Kafka may discard uncommitted data; with correct isolation settings, consumers will only read committed messages.

Overall, the article provides practical code snippets and configuration guidance for building reliable exactly‑once streaming applications with Flink and Kafka.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkKafkaExactly-Once
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.