Big Data 25 min read

How to Build a Production‑Ready Flink ClickHouse Sink with Dynamic Sharding, Batch‑by‑Size, and Robust Retry

This article presents a production‑grade Flink ClickHouse sink that solves common pain points such as lack of size‑based batching, static table schemas, and distributed‑table latency by introducing data‑size batching, dynamic table routing, local‑table writes, load‑balanced node discovery, back‑pressure queues, dual‑trigger flush, and recursive retry with node exclusion, all integrated with Flink checkpoint semantics for at‑least‑once guarantees.

DeWu Technology
DeWu Technology
DeWu Technology
How to Build a Production‑Ready Flink ClickHouse Sink with Dynamic Sharding, Batch‑by‑Size, and Robust Retry

In real‑time big‑data scenarios, Flink combined with ClickHouse is widely used for log processing, monitoring, and metric aggregation, where massive TPS, low latency, high accuracy, and multi‑table writes are required.

Background and Pain Points

Flink's official ClickHouse JDBC sink batches only by record count, leading to uncontrolled memory usage, OOM risk, and poor performance for variable‑size logs.

The sink uses a fixed INSERT SQL, preventing dynamic table creation and per‑application isolation.

Writing to ClickHouse distributed tables adds network hops, causing high latency, throughput loss, and hotspot bottlenecks.

Core Architecture Overview

The solution introduces several key components:

ClickHouseSinkCounter : tracks accumulated byte size (metaSize) and records per application.

ClickHouseShardStrategy and LogClickHouseShardStrategy : generate target table names dynamically (e.g., tb_log_order_service) based on the application name.

ClickHouseLocalWriter : writes directly to local tables, avoiding distributed‑table forwarding.

ClusterIpsUtils : discovers healthy ClickHouse nodes from system.clusters and refreshes the list hourly.

ClickHouseWriter : maintains a bounded blocking queue, a pool of writer threads, and back‑pressure via queue.put().

ClickHouseShardSinkBuffer : implements a dual‑trigger flush (byte‑size threshold or timeout with random jitter) and per‑application buffers.

Retry and Exception Handling : recursive retry with exclusion of failed hosts, Future timeout (3 min), and configurable max retries.

Checkpoint Coordination : a scheduled cleaner pauses during checkpoint flush, ensuring no concurrent buffer flushes and guaranteeing at‑least‑once semantics.

Key Improvements

Data‑size based batching : metaSize drives flush, providing precise memory control.

Dynamic table routing : each application writes to its own table, enabling isolation and easy scaling.

Local‑table writes + dynamic node discovery : random healthy node selection, automatic addition/removal of nodes, and load‑balanced writes.

Back‑pressure queue : bounded LinkedBlockingQueue with configurable capacity prevents overload.

Dual‑trigger flush : batch size (e.g., 10 000 bytes) or timeout (e.g., 30 s + 10% jitter) ensures timely writes.

Recursive retry with node exclusion : failed hosts are removed from the candidate set; retries stop after a configurable limit.

Checkpoint‑aware flushing : beforeFlush() sets a volatile flag, the cleaner thread skips execution, and afterFlush() resumes it, avoiding duplicate writes.

Configuration Example

ClickHouseShardSinkBuffer.Builder()
    .aClickHouseSinkBuffer()
    .withTargetTable("single_table")
    .withMaxFlushBufferSize(10000) // bytes
    .withTimeoutSec(30)
    .withClickHouseShardStrategy(new LogClickHouseShardStrategy("tb_log_%s", 8))
    .build(clickHouseWriter);

Other notable settings include HikariCP connection pool (max 20 connections, 30 s connection timeout, 3 min socket timeout), queue capacity (default 10), writer thread count (e.g., 10), and retry count (e.g., 10).

Sink Modes

UnexceptionableSink (At‑Most‑Once): ignores write failures, suitable for low‑accuracy log statistics.

ExceptionsThrowableSink (At‑Least‑Once): propagates exceptions, causing Flink task failure and restart for high‑accuracy requirements.

Checkpoint Semantics

During snapshotState(), the sink flushes buffers and waits for all futures (max 3 min each). The checkpoint timeout must exceed the sum of Future timeout and retry latency (e.g., 10 min). This guarantees that either all buffered data is persisted or the checkpoint fails, preventing data loss.

Best Practices & Tuning

Enable checkpointing (e.g., every 60 s) with EXACTLY_ONCE mode.

Set checkpoint timeout > Future timeout × max retries.

Monitor queue size, future failure rate, and retry counts.

Prefer local‑table writes with HikariCP for lower latency.

Adjust batch size and timeout based on average log size to balance memory and throughput.

Conclusion

The presented sink addresses the three major shortcomings of the official Flink ClickHouse connector by providing size‑aware batching, dynamic sharding, and robust fault tolerance. It has been validated in production to sustain millions of TPS with stable latency, offering both At‑Least‑Once (via ExceptionsThrowableSink) and At‑Most‑Once (via UnexceptionableSink) guarantees depending on business needs.

FlinkStreamingClickHouseretryCheckpointbackpressureBatchingDynamic Sharding
DeWu Technology
Written by

DeWu Technology

A platform for sharing and discussing tech knowledge, guiding you toward the cloud of technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.