Databases 14 min read

How ClickHouse Distributed Tables Write Data: Sync vs Async Explained

This article dissects ClickHouse's Distributed table write path, detailing how the Distributed engine creates tables, decides between synchronous and asynchronous insertion, manages local and remote shards, handles sharding keys, and ensures atomic data distribution across the cluster.

JD Cloud Developers
JD Cloud Developers
JD Cloud Developers
How ClickHouse Distributed Tables Write Data: Sync vs Async Explained

ClickHouse, like ElasticSearch, uses data sharding (shard) as a core feature of its distributed storage, improving efficiency through parallel reads and writes. The Distributed engine implements a Distributed table mechanism that creates a view over all local tables for convenient distributed queries.

Distributed Table Engine Overview

The Distributed table engine does not store data itself; it reads from or writes to remote nodes' tables. Its creation depends on existing local tables, with a statement similar to:

<code>CREATE TABLE {table} ON CLUSTER {cluster}
AS {local_table}
ENGINE = Distributed({cluster}, {database}, {local_table}, {policy})
</code>

The

policy

can be a random function (e.g.,

rand()

) or a hash function (e.g.,

halfMD5hash(id)

).

Cluster Configuration

ClickHouse cluster nodes are defined in

remote_servers

XML configuration, for example:

<code>&lt;remote_servers&gt;
    &lt;logs&gt;
        &lt;shard&gt;
            &lt;weight&gt;1&lt;/weight&gt;
            &lt;internal_replication&gt;true&lt;/internal_replication&gt;
            &lt;replica&gt;
                &lt;priority&gt;1&lt;/priority&gt;
                &lt;host&gt;example01-01-1&lt;/host&gt;
                &lt;port&gt;9000&lt;/port&gt;
            &lt;/replica&gt;
            &lt;replica&gt;
                &lt;host&gt;example01-01-2&lt;/host&gt;
                &lt;port&gt;9000&lt;/port&gt;
            &lt;/replica&gt;
        &lt;/shard&gt;
        &lt;shard&gt;
            &lt;weight&gt;2&lt;/weight&gt;
            &lt;internal_replication&gt;true&lt;/internal_replication&gt;
            &lt;replica&gt;
                &lt;host&gt;example01-02-1&lt;/host&gt;
                &lt;port&gt;9000&lt;/port&gt;
            &lt;/replica&gt;
            &lt;replica&gt;
                &lt;host&gt;example01-02-2&lt;/host&gt;
                &lt;port&gt;9000&lt;/port&gt;
            &lt;/replica&gt;
        &lt;/shard&gt;
    &lt;/logs&gt;
&lt;/remote_servers&gt;
</code>

Write Path Entry Point

The write process starts with the constructor of

DistributedBlockOutputStream

:

<code>DistributedBlockOutputStream(const Context &amp;context_, StorageDistributed &amp;storage_, const ASTPtr &amp;query_ast_, const ClusterPtr &amp;cluster_, bool insert_sync_, UInt64 insert_timeout_);
</code>

If

insert_sync_

is

true

, the insertion is synchronous; otherwise it is asynchronous. The flag is derived from the

insert_distributed_sync

setting and the presence of an

owned_cluster

(used by table functions).

Synchronous vs Asynchronous Insertion

Synchronous insertion writes directly to the target table, while asynchronous insertion first writes to a local temporary file and later distributes the data to remote nodes.

<code>DistributedBlockOutputStream::write()
    ↓
    if insert_sync
        ↓
        writeSync()
    else
        ↓
        writeAsync()
</code>

Asynchronous Write Implementation

The core of asynchronous writing is

writeAsyncImpl()

. It checks whether the shard has internal replication and then either writes to the local node first or iterates over all shards:

<code>writeAsyncImpl()
    ↓
    if shard_info.hasInternalReplication()
        writeToLocal()
        writeToShard()
    else
        for each shard { writeToShard() }
</code>
writeToLocal()

prefers a local replica when available.

writeToShard()

handles the actual file creation and hard‑linking to ensure atomicity.

<code>void DistributedBlockOutputStream::writeToShard(const Block &amp;block, const std::vector&lt;std::string&gt; &amp;dir_names)
{
    std::string first_file_tmp_path{};
    bool first = true;
    for (const auto &amp;dir_name : dir_names)
    {
        const auto &amp;path = storage.getPath() + dir_name + '/';
        if (Poco::File(path).createDirectory())
            storage.requireDirectoryMonitor(dir_name);
        const auto &amp;file_name = toString(storage.file_names_increment.get()) + ".bin";
        const auto &amp;block_file_path = path + file_name;
        if (first)
        {
            first = false;
            const auto &amp;tmp_path = path + "tmp/";
            Poco::File(tmp_path).createDirectory();
            const auto &amp;block_file_tmp_path = tmp_path + file_name;
            first_file_tmp_path = block_file_tmp_path;
            WriteBufferFromFile out{block_file_tmp_path};
            CompressedWriteBuffer compress{out};
            NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
            writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out);
            context.getSettingsRef().serialize(out);
            writeStringBinary(query_string, out);
            stream.writePrefix();
            stream.write(block);
            stream.writeSuffix();
        }
        if (link(first_file_tmp_path.data(), block_file_path.data()))
            throwFromErrnoWithPath("Could not link " + block_file_path + " to " + first_file_tmp_path, block_file_path, ErrorCodes::CANNOT_LINK);
    }
    // ...
}
</code>

Data files are stored under

/var/lib/clickhouse/data/{database}/{table}/

, with a separate directory for each shard (e.g.,

'default@ck2-0:9000,default@ck2-1:9000'

). Each shard directory contains a

tmp

subdirectory used to write temporary files before hard‑linking them into the final location, guaranteeing that only complete files are visible to the distribution thread.

<code># tree
.
├── 'default@ck2-0:9000,default@ck2-1:9000'
│   ├── 25.bin
│   └── tmp
│       └── 26.bin
└── 'default@ck3-0:9000,default@ck3-1:9000'
    └── tmp
</code>

Sharding Key Processing

If a sharding key is defined and the cluster has more than one shard, the block is split before asynchronous distribution:

<code>writeAsync()
    ↓
    if storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)
        writeAsyncImpl(block)   // normal path
    else
        writeSplitAsync(block) // split then writeAsyncImpl for each shard
</code>

The sharding key expression is built when the table is created:

<code>sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, getColumns().getAllPhysical(), false);
</code>

During splitting, a selector is created that maps each row to a shard based on the evaluated sharding key and the cluster's slot‑to‑shard mapping:

<code>IColumn::Selector DistributedBlockOutputStream::createSelector(const Block &amp;source_block)
{
    Block current_block_with_sharding_key_expr = source_block;
    storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
    const auto &amp;key_column = current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName());
    const auto &amp;slot_to_shard = cluster->getSlotToShard();
    // ... selector construction logic ...
    throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
}
</code>

Rows are then scattered to the appropriate shards using the selector, respecting the weight configuration defined in

remote_servers

.

Practical Recommendations

Writing to Distributed tables creates temporary local data, increasing CPU and memory usage; minimize write operations on such tables. The temporary block is split by sharding key and weight, generating many smaller blocks that increase merge load on remote nodes. When a Distributed table is created via a table function, it usually performs synchronous writes; be aware of this behavior.

Understanding these mechanisms helps you use Distributed tables efficiently and troubleshoot performance issues.

ShardingClickHouseDistributed TableData InsertionSync vs Async
JD Cloud Developers
Written by

JD Cloud Developers

JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.