Building Real‑Time OLAP Reports with Flink SQL CDC and Elasticsearch
This article details a production‑grade pipeline that uses Apache Flink 1.11's SQL CDC to stream MySQL changes into Elasticsearch, enabling low‑latency OLAP reporting, and shares the architecture, DDL/DML scripts, operational settings, and dozens of pitfalls encountered along the way.
Project Background
The company needed an efficient, accurate OLAP service for massive, fast‑growing MySQL tables (tens of millions to billions of rows). Traditional batch joins could not meet real‑time reporting requirements, prompting a shift to stream processing.
Solution Overview
We adopted Flink SQL CDC combined with Elasticsearch as the real‑time reporting engine. Flink CDC captures full‑load data, then switches to MySQL binlog for incremental changes, joins with a dimension table, performs pre‑aggregation, and writes results to Elasticsearch, where the front‑end can query directly.
Key advantages of Flink SQL CDC over a Canal+Kafka setup include fewer components, lower latency, exactly‑once semantics, no data landing, and support for both full‑load and incremental streams.
Reduced component count and simplified pipeline
Lower end‑to‑end latency
Reduced maintenance and development effort
Exactly‑once processing for financial data
No data persistence, saving storage costs
Supports both full and incremental reads
Running Environment
The production cluster runs Hadoop + Flink + Elasticsearch on YARN in per‑job mode, using RocksDB as the state backend and HDFS for checkpoint storage. Jobs are submitted via the Flink SQL Client, written entirely in SQL without Java code.
Three Flink CDC jobs have been online for two weeks, continuously aggregating order and bill data into Elasticsearch.
Implementation Details
1. Start the SQL client: ./sql-client.sh embedded 2. Create source, dimension, and sink tables with DDL (placeholders for credentials are omitted for brevity):
CREATE TABLE bill_info (
billCode STRING,
serviceCode STRING,
accountPeriod STRING,
subjectName STRING,
subjectCode STRING,
occurDate TIMESTAMP,
amt DECIMAL(11,2),
status STRING,
proc_time AS PROCTIME()
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '******',
'port' = '3307',
'username' = '******',
'password' = '******',
'database-name' = 'cdc',
'table-name' = '***'
);
CREATE TABLE order_info ( ... same structure ... );
CREATE TABLE subject_info (
code VARCHAR(32) NOT NULL,
name VARCHAR(64) NOT NULL,
PRIMARY KEY (code) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = '***',
'username' = '******',
'password' = '******',
'lookup.cache.max-rows' = '3000',
'lookup.cache.ttl' = '10s',
'lookup.max-retries' = '3'
);
CREATE TABLE income_distribution (
serviceCode STRING,
accountPeriod STRING,
subjectCode STRING,
subjectName STRING,
amt DECIMAL(13,2),
PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://xxxx:9200',
'index' = 'income_distribution',
'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
);3. Insert aggregation results into the sink:
INSERT INTO income_distribution
SELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt
FROM (
SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM bill_info AS b
JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code
GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
UNION ALL
SELECT o.serviceCode, o.accountPeriod, o.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM order_info AS o
JOIN subject_info FOR SYSTEM_TIME AS OF o.proc_time s ON o.subjectCode = s.code
GROUP BY o.serviceCode, o.accountPeriod, o.subjectCode, s.name
) AS t1
GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;Temporal table joins require a processing‑time attribute (proc_time) and the FOR SYSTEM_TIME AS OF syntax.
Pitfalls & Lessons Learned
Running multiple jobs in Flink standalone session caused task sharing and log mixing; switched to YARN per‑job mode for isolation.
Elasticsearch connector only supports sink, not source; attempts to query it caused errors.
SQL Client ignored parallelism set in flink-conf.yaml; the sql-client-defaults.yaml overrides it.
Long‑running full‑table scans triggered checkpoint timeouts and failover; configured tolerant failed checkpoints and a fixed‑delay restart strategy.
Enable YARN per‑job mode via execution.target: yarn-per-job in flink-conf.yaml.
SQL Client’s in‑memory catalog means tables disappear after restart; persistent catalogs are needed for shared metadata.
Elasticsearch mapping type mismatch (long vs. decimal) caused indexing errors; pre‑create the index with a decimal‑compatible mapping.
CDC source failed on ALTER DDL statements; upgraded to flink-sql-connector-mysql-cdc-1.1.0.jar which skips unparsable DDL.
Full‑table scan slowness often stemmed from downstream back‑pressure; tuned mini‑batch settings and enabled distinct‑agg split to improve throughput.
Missing MySQL RELOAD privilege prevented global read locks; granting RELOAD or setting 'debezium.snapshot.locking.mode'='none' resolved it.
Duplicate server‑id across jobs caused data loss; overridden server‑id per query using table hints.
YARN AM resource limits blocked job start; increased yarn.scheduler.capacity.maximum-am-resource-percent to 0.3.
AM containers were killed due to virtual memory exhaustion; adjusted yarn.nodemanager.vmem-pmem-ratio or disabled vmem checks.
Conclusion
Switching from a Canal+Kafka+Flink DataStream solution to Flink SQL CDC eliminated Java code, reduced component overhead, and delivered stable, low‑latency real‑time reporting. The team now promotes Flink SQL CDC across other pipelines and thanks the open‑source community for its rapid evolution.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
