Real-time Data Synchronization from OceanBase to Kafka Using ActionOMS and Flink
This article demonstrates how to use ActionOMS to capture incremental changes from OceanBase, stream them to Kafka in various formats, and employ Flink to deduplicate and aggregate transaction data into a daily summary, illustrating a complete real-time data pipeline for financial use cases.
Background
In the digital era, real‑time data warehouse technology is widely used in finance, e‑commerce, and manufacturing, where timely capture of database incremental records is critical. OceanBase is a high‑performance distributed relational database that provides strong consistency and high throughput.
ActionOMS leverages OceanBase CDC to pull redo logs via RPC, assemble distributed transactions, parse data, format statements, and output change data per transaction. The tool can forward data to Kafka, RocketMQ, DataHub, etc., enabling rapid construction of real‑time warehouses.
Example
Business Scenario
A bank’s transaction table may contain duplicate records due to system delays. After deduplication, the data is aggregated to analyze customer consumption habits.
Data Source – Data Channel
ActionOMS can sync full and incremental data (DML/DDL) from OceanBase to Kafka in multiple formats such as Default, Canal, Debezium, Avro, etc. Note that when a job resumes, Kafka may contain recent duplicate records, so downstream systems must support deduplication.
Below is an example of the JSON message format produced by ActionOMS:
{
"prevStruct": null,
"postStruct": {
"order_id": "RTDW202411210006",
"user": "u001",
"product": "p008",
"num": 800,
"proctime": "1732181459",
"__pk_increment": 8
},
"allMetaData": {
"checkpoint": "1732168058",
"dbType": "OB_MYSQL",
"storeDataSequence": 173216805935500000,
"db": "oms_mysql.rt_dw_test",
"timestamp": "1732168059",
"uniqueId": "1002_1001_7681208\u0000\u0000_5572734820_0",
"transId": "1002_7681208",
"clusterId": "33",
"ddlType": null,
"record_primary_key": "__pk_increment",
"source_identity": "OB_MYSQL_ten_1_698lmn9kj7cw-1-0",
"record_primary_value": "8",
"table_name": "orders"
},
"recordType": "INSERT"
}Flink – Data Warehouse
Flink subscribes to Kafka, deduplicates records using the primary key, and aggregates daily transaction volume and total amount, then writes the result back to OceanBase.
Input table definition in Flink:
CREATE TABLE kafka_input (
prevStruct ROW<>,
postStruct ROW<
order_id STRING,
`user` STRING,
product STRING,
num INT,
proctime STRING
>,
allMetaData ROW<>,
recordType STRING
) WITH (
'connector' = 'kafka',
'topic' = 'rt_dw_test',
'properties.bootstrap.servers' = 'ip:port',
'properties.group.id' = 'oms_test_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'json.timestamp-format.standard' = 'ISO-8601'
);Output table definition:
CREATE TABLE daily_order_summary (
order_date DATE,
total_orders BIGINT,
total_amount DECIMAL(10, 2),
PRIMARY KEY (order_date) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'table-name' = 'daily_order_summary',
'url' = 'jdbc:mysql://ip:port/rt_dw_test',
'username' = 'test',
'password' = 'test'
);Flink SQL that performs deduplication and aggregation:
INSERT INTO daily_order_summary
SELECT
CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATE) AS order_date,
COUNT(DISTINCT postStruct.order_id) AS total_orders,
SUM(postStruct.num) AS total_amount
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY postStruct.order_id ORDER BY TO_TIMESTAMP(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT))) DESC) AS row_num
FROM kafka_input
) WHERE row_num = 1
GROUP BY CAST(FROM_UNIXTIME(CAST(postStruct.proctime AS BIGINT)) AS DATE);Validation
After running the Flink job, the aggregated results for historical data contain no duplicates. When new and duplicate orders are inserted into OceanBase, the pipeline synchronizes them to Kafka, Flink removes duplicates, and the final summary reflects only the new orders.
Summary
Using ActionOMS to sync OceanBase data to Kafka, combined with Flink for real‑time deduplication and aggregation, provides an efficient end‑to‑end solution for building real‑time data warehouses, especially in financial scenarios where data accuracy and latency are critical.
Aikesheng Open Source Community
The Aikesheng Open Source Community provides stable, enterprise‑grade MySQL open‑source tools and services, releases a premium open‑source component each year (1024), and continuously operates and maintains them.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.