Understanding Paimon's Changelog Producer: Four Modes and Their Trade‑offs
The article explains Paimon's changelog‑producer capability, detailing its purpose, storage format, and the four generation modes—None, Input, Lookup, and Full Compaction—while comparing their costs, implementation details, and suitability for different data sources such as CDC.
Earlier I mentioned a streaming‑batch solution that implements stream‑batch capabilities directly in the storage engine; Paimon's changelog ability is a core reason it is a top choice for such frameworks.
Paimon provides four ways to generate a changelog. This article describes their purpose, storage form, and the differences among the modes.
Purpose
The changelog producer's main goal is to generate a streaming‑read changelog on a Paimon table. If a table is only read in batch mode, the producer can be omitted.
For databases like MySQL, data‑modifying statements (INSERT, UPDATE, DELETE) are recorded in the binlog, which is analogous to Paimon's input changelog producer.
Storage Form
Changelogs are stored as separate files and are committed during snapshot commits. Each snapshot's metadata records a changelogManifestList, and when a snapshot expires, its changelog files expire as well.
The four producer modes—None, Input, Lookup, Full Compaction—are ordered from lowest to highest generation cost.
Four Modes
None
In this default mode, Paimon does not store additional data. The source reads the snapshot's delta list file, which represents the incremental changelog for that snapshot.
When a primary key receives two INSERTs, batch reads return a merged value, while streaming reads should return two INSERTs; however, the None mode does not produce the required -U +U sequence.
CREATE TABLE T (
a INT,
b INT,
c STRING,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
'merge-engine' = 'deduplicate',
'changelog-producer' = 'none',
'continuous.discovery-interval' = '1s' -- discovery interval set to 1 second
); BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * FROM T");
sql("INSERT INTO T VALUES(1, 1, '1')");
// two inserts spaced 2 seconds apart so source can read two snapshots
Thread.sleep(2000);
sql("INSERT INTO T VALUES(1, 1, '2')");
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, 1, "1"),
Row.ofKind(RowKind.INSERT, 2, 2, "2")); // first commit
Successfully commit snapshot #1 (path /warehouse/default.db/T/snapshot/snapshot-1) by user 6434ee5c-ad2e-4564-a32c-568104392533 with identifier 9223372036854775807 and kind APPEND.
// scan first snapshot
start snapshotId: 1
// second commit
Successfully commit snapshot #2 (path /warehouse/default.db/T/snapshot/snapshot-2) by user ce0b10c0-e63f-4db0-ab90-1c542e832791 with identifier 9223372036854775807 and kind APPEND.
// scan delta files
scan with delta 2
// output data
[+I[1, 1, 1]]
[+I[1, 1, 1], -U[1, 1, 1]]
[+I[1, 1, 1], -U[1, 1, 1], +U[1, 1, 2]]The stream read output shows a correct changelog, but the None mode itself does not emit the -U. The downstream task generates a ChangelogNormalize operator that synthesizes the missing entries.
When downstream tasks do not rely on a complete changelog (e.g., simple synchronization), the scan.remove-normalize parameter can disable the normalize step, and a fabricated ChangelogMode of ALL can be used to bypass it. However, the normalize node has a TTL; if a key update arrives after the TTL, it may be incorrectly treated as a new insert, leading to data inaccuracies.
Delta Follow‑Up Scanner
Streaming reads consist of two parts: historical (snapshot) and incremental (delta). Some modes skip the historical part, but the incremental part usually reads commits with APPEND kind, i.e., the delta list files. In this streaming mode, the delta scanner only reads L0 files.
Input
Do not look up old values; write an extra changelog.
During writes, a duplicate file is created as the changelog. This mode is lightweight because it avoids looking up old values; however, it cannot handle cases where the source lacks a complete changelog (e.g., importing duplicate data from an offline table).
It works well for CDC sources, but the None mode is unsuitable for CDC because L0 files may already have merged updates, losing the original -U and +U sequence.
Lookup
Look up old values; write an extra changelog.
If the source is not CDC or the table performs partial updates/aggregation, the previous two modes cannot generate a correct changelog. Lookup compaction searches higher‑level files during compaction to find the previous value of a key; if found, it emits both UPDATE_BEFORE and UPDATE_AFTER messages.
Lookup uses the LookupCompaction strategy to select files, ensuring L0 files are compacted when the universal compaction does not produce a compaction unit.
public KeyValue getResult() {
// 1. Find the latest high level record
Iterator<KeyValue> descending = candidates.descendingIterator();
while (descending.hasNext()) {
KeyValue kv = descending.next();
if (kv.level() > 0) {
if (highLevel != null) {
descending.remove();
} else {
highLevel = kv;
}
} else {
containLevel0 = true;
}
}
// 2. Merge inputs
mergeFunction.reset();
candidates.forEach(mergeFunction::add);
return mergeFunction.getResult();
}The lookup merge function may need to perform binary search and build index files, which incurs higher cost.
Full Compaction
Look up old values; write an extra changelog.
Full compaction is triggered periodically via full-compaction.delta-commits. Although costly, it writes all data to the highest level, making every value change reconstructable.
public ChangelogResult getResult() {
reusedResult.reset();
if (isInitialized) {
KeyValue merged = mergeFunction.getResult();
if (topLevelKv == null) {
if (merged != null && isAdd(merged)) {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, merged));
}
} else {
if (merged == null || !isAdd(merged)) {
reusedResult.addChangelog(replace(reusedBefore, RowKind.DELETE, topLevelKv));
} else if (!changelogRowDeduplicate || !valueEqualiser.equals(topLevelKv.value(), merged.value())) {
reusedResult
.addChangelog(replace(reusedBefore, RowKind.UPDATE_BEFORE, topLevelKv))
.addChangelog(replace(reusedAfter, RowKind.UPDATE_AFTER, merged));
}
}
return reusedResult.setResultIfNotRetract(merged);
} else {
if (topLevelKv == null && isAdd(initialKv)) {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, initialKv));
}
return reusedResult.setResultIfNotRetract(initialKv);
}
}Full compaction yields the most complete changelog at the expense of higher resource consumption.
Overall, the choice among None, Input, Lookup, and Full Compaction depends on the data source characteristics, desired latency, and acceptable storage/computation overhead.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
