Big Data 15 min read

Understanding Retract Streams in Apache Flink: Aggregation and Sink Operators

This article explains the concept of retract streams in Apache Flink, detailing how non‑retract Kafka sources and Group‑By aggregations generate delete/insert messages, provides code examples for aggregation and sink operators, and compares retract mechanisms across aggregation and CDC sink scenarios.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Retract Streams in Apache Flink: Aggregation and Sink Operators

1. Definition

In Flink, a Kafka source is a non‑retract stream, while GROUP BY produces a retract stream. A retract stream can update historical data by emitting a delete message followed by an insert message when a key’s value changes, even though downstream messages cannot be recalled.

2. Example

public class RetractDemo {
    public static void main(String[] args) throws Exception {
        // set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // use blink planner in streaming mode
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        // simulate non‑retract messages with fromElements
        DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
                new Tuple2<>("hello", 1),
                new Tuple2<>("hello", 1),
                new Tuple2<>("hello", 1));
        tEnv.registerDataStream("tmpTable", dataStream, "word, num");
        Table table = tEnv.sqlQuery("select cnt, count(word) as freq from (select word, count(num) as cnt from tmpTable group by word) group by cnt");
        // enable retract stream mechanism
        tEnv.toRetractStream(table, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})).print();
        env.execute();
    }
}

Result:

(true,(1,1))
(false,(1,1))
(true,(2,1))
(false,(2,1))
(true,(3,1))

2.1 Source Code Analysis

2.1.1 Aggregation Operator Retract

The first level COUNT receives a non‑retract stream from a Kafka source:

SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region

The second level COUNT receives the retract stream produced by the first level:

SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt

Flink generates the aggregation method GeneratedAggregations#retract() inside AggregateUtil.createGroupAggregateFunction and compiles it with Janino.

object AggregateUtil {
  private[flink] def createDataStreamGroupAggregateFunction[K](...generateRetraction: Boolean...){
    ...
    // dynamically generate aggregation method
    new GroupTableAggProcessFunction[K](
      genAggregations.asInstanceOf[GeneratedTableAggregationsFunction],
      aggregationStateType,
      // true means the upstream stream supports retraction
      generateRetraction,
      groupings.length,
      queryConfig)
    ...
  }
}
class GroupAggProcessFunction[K](...private val generateRetraction: Boolean...){
  private var function: GeneratedTableAggregations = _
  override def open(config: Configuration) {
    LOG.debug(s"Compiling TableAggregateHelper: ${genTableAggregations.name}
Code:${genTableAggregations.code}")
    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code)
    LOG.debug("Instantiating TableAggregateHelper.")
    function = clazz.newInstance()
    function.open(getRuntimeContext)
    ...
  }
}
// Janino usage
import org.codehaus.janino.SimpleCompiler
trait Compiler[T] {
  @throws(classOf[CompileException])
  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
    require(cl != null, "Classloader must not be null.")
    val compiler = new SimpleCompiler()
    compiler.setParentClassLoader(cl)
    try {
      compiler.cook(code)
    } catch {
      case t: Throwable =>
        throw new InvalidProgramException("Table program cannot be compiled. This is a bug. Please file an issue.", t)
    }
    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
  }
}

During processing, Flink maintains two states: state (the intermediate aggregation result) and cntState (the count of messages for a key, +1 for insert, –1 for retract). The logic decides whether to emit a new aggregation message or a retract message.

state class GroupAggProcessFunction[K](...) extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig) {
  override def processElement(inputC: CRow, ctx: KeyedProcessFunction[K, CRow, CRow]#Context, out: Collector[CRow]): Unit = {
    var accumulators = state.value()
    var inputCnt = cntState.value()
    if (inputC.change) { // aggregation message
      inputCnt += 1
      function.accumulate(accumulators, input)
      function.setAggregationResults(accumulators, newRow.row)
    } else { // retract message
      inputCnt -= 1
      function.retract(accumulators, input)
      function.setAggregationResults(accumulators, newRow.row)
    }
    if (inputCnt != 0) {
      state.update(accumulators)
      cntState.update(inputCnt)
      out.collect(newRow)
    } else {
      out.collect(prevRow)
      state.clear()
      cntState.clear()
    }
    ...
  }
}

2.1.2 Sink Operator Retract

Sink operators handle retract streams differently. Flink defines three sink modes:

Append mode – no primary key, only INSERT statements are generated.

Upsert mode – primary key defined; Flink generates INSERT/UPDATE/DELETE based on the retraction flag.

Retract mode – both INSERT and DELETE messages are emitted, and the connector translates them into corresponding data‑store operations.

Whether a sink supports retraction depends on the underlying data store. For example, Kafka only supports Append mode, while JDBC can support Upsert (using MySQL’s ON DUPLICATE KEY UPDATE).

Kafka Sink

Kafka stores messages by appending to a log, so it only supports Append mode. A common workaround is to filter out retract messages before writing to Kafka:

public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        final SinkFunction<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
        // filter out retract messages
        return dataStream.filter(t -> t.f0).map(t -> t.f1)
            .addSink(kafkaProducer)
            .setParallelism(dataStream.getParallelism())
            .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
    }
}

JDBC Sink

MySQL can theoretically support Append, Upsert, and Retract, but Flink 1.11 only provides an Upsert connector. It relies on MySQL’s atomic INSERT … ON DUPLICATE KEY UPDATE statement:

INSERT INTO `TABLE_SINK_SYNC`(`eno`, `ename`, `esex`, `ebirthday`, `eteam`, `eincome`)
VALUES (1, 2, 3, 4, 5, 6)
ON DUPLICATE KEY UPDATE `eno`=VALUES(`eno`),
                        `ename`=VALUES(`ename`),
                        `esex`=VALUES(`esex`),
                        `ebirthday`=VALUES(`ebirthday`),
                        `eteam`=VALUES(`eteam`),
                        `eincome`=VALUES(`eincome`)

When the Upsert operation is not atomic, concurrency issues may arise.

2.1.3 Aggregation vs. Sink Retract

Aggregation operators maintain two states ( state and cntState) to decide whether to emit a new aggregation result or a retract message. Sink operators, especially in CDC scenarios, interpret the op field (e.g., c, u, d) and emit corresponding RowKind messages.

public final class DebeziumJsonDeserializationSchema implements DeserializationSchema<RowData> {
    @Override
    public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
        GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
        GenericRowData payload = schemaInclude ? (GenericRowData) row.getField(0) : row;
        GenericRowData before = (GenericRowData) payload.getField(0);
        GenericRowData after = (GenericRowData) payload.getField(1);
        String op = payload.getField(2).toString();
        if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
            after.setRowKind(RowKind.INSERT);
            out.collect(after);
        } else if (OP_UPDATE.equals(op)) {
            before.setRowKind(RowKind.UPDATE_BEFORE);
            after.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(before);
            out.collect(after);
        } else if (OP_DELETE.equals(op)) {
            before.setRowKind(RowKind.DELETE);
            out.collect(before);
        } else {
            if (!ignoreParseErrors) {
                throw new IOException(String.format("Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", op, new String(message)));
            }
        }
    }
}

Conclusion

Both aggregation and sink operators support retract semantics, but their underlying mechanisms differ: aggregation retract updates internal state for continuous queries, while sink retract ensures correct semantics in CDC pipelines.

Aggregation retract guarantees the correctness of Flink SQL incremental queries; sink retract guarantees the correctness of downstream CDC consumers.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkKafkaCDCRetract StreamaggregationSink
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.