Databases 25 min read

Elasticsearch Data Write Process: Coordinating Node Phase Explained

This article explains how Elasticsearch handles data ingestion by detailing the roles of cluster nodes, the coordinating node workflow, index structures, shard allocation, request validation, pipeline processing, automatic index creation, and the underlying code paths involved in the write operation.

Xueersi Online School Tech Team
Xueersi Online School Tech Team
Xueersi Online School Tech Team
Elasticsearch Data Write Process: Coordinating Node Phase Explained

Background

The author explores how external data is written to Elasticsearch and persisted to storage, motivated by curiosity after reading the "Elasticsearch: The Definitive Guide" and Lucene documentation, and decides to study the source code directly.

Related Concepts

Elasticsearch clusters consist of several node roles: Master, Data, Ingest (pre‑processing), Coordinating, and Tribe nodes. Each role has specific responsibilities and configuration settings.

Master Node

Responsible for cluster‑level operations such as index creation and cluster state updates. A node becomes master when node.master:true and can be queried via GET _cat/nodes?v&h=name,node.role,master . Example configuration for a dedicated master node:

node.master: true
node.data: false

To avoid split‑brain, discovery.zen.minimum_master_node should be set to (master_eligible_nodes / 2) + 1 .

Data Node

Stores data and executes CRUD, search, and aggregation operations. Configuration for a dedicated data node:

node.master:false
node.data:true
node.ingest:false

Ingest (Pre‑processing) Node

Introduced in ES 5.0, it runs pipelines of processors before indexing documents. Dedicated ingest node configuration:

node.master:false
node.data:false
node.ingest:true

Coordinating Node

Any node can act as a coordinating node; it receives client requests, forwards them to the appropriate primary shard, gathers responses from replicas, merges results, and returns the final response. A dedicated coordinating node can be configured as:

node.master:false
node.data:false
node.ingest:false

Tribe Node

Acts as a federated client across multiple clusters, allowing unified queries. Example configuration includes HTTP and transport ports and the definition of each remote cluster (t1, t2, …).

http.port: 9200
http.publish_port: 9200
transport.tcp.port: 9300
node.master:false
node.data:false
tribe.t1.cluster.name: cluster_test1
tribe.t1.network.host: 192.168.70.109
... (additional tribe settings)

Index Level

Indexes can be thought of as databases (pre‑6.0) or tables (post‑6.0). An index consists of _index , _type (deprecated), and _id . Documents are stored in shards, each shard being a Lucene index composed of multiple segments.

Shard and Replica

Data is split into primary shards and replica shards to enable horizontal scaling and high availability. Primary shards handle write operations first; replicas are updated afterward. Elasticsearch provides weak consistency: reads from the primary see the latest data, while reads from replicas may be stale.

Data Write Process

Writes are handled via Index requests (single document) or Bulk requests (multiple documents). The process is divided into three phases: coordinating node handling, primary shard handling, and replica handling. This article focuses on the first phase.

Basic Write Flow

1. Client sends a request to a node. 2. The node determines the target shard based on the document _id and routes the request to the node holding the primary shard. 3. The primary shard processes the request, forwards it to replica shards, and reports success back to the coordinating node, which finally responds to the client.

Detailed Coordinating Node Workflow

The coordinating node performs parameter validation (e.g., IndexRequest.validate() ), handles ingest pipelines, checks automatic index creation settings, and groups bulk requests by shard to minimize network round‑trips.

Parameter validation example (excerpt):

// Parent validation
ActionRequestValidationException validationException = super.validate();
// Type must not be null
if (type == null) {
    validationException = addValidationError("type is missing", validationException);
}
// Source (document body) must not be null
if (source == null) {
    validationException = addValidationError("source is missing", validationException);
}
// Content type must be specified
if (contentType == null) {
    validationException = addValidationError("content type is missing", validationException);
}
... (additional checks)

Ingest pipeline example:

PUT _ingest/pipeline/pipelineA
{
  "description": "inner pipeline",
  "processors": [
    { "set": { "field": "inner_pipeline_set", "value": "inner" } }
  ]
}

When a document is indexed with ?pipeline=pipelineA , the processor adds the field inner_pipeline_set":"inner" before the document is stored.

Automatic index creation (default enabled) collects all target indices from the bulk request, filters those that already exist, and creates missing ones via the master node. Example code for creating missing indices uses an AtomicInteger counter to track completion.

// Collect indices
final Set
indices = bulkRequest.requests.stream()
    .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE ||
        request.versionType() == VersionType.EXTERNAL ||
        request.versionType() == VersionType.EXTERNAL_GTE)
    .map(DocWriteRequest::index)
    .collect(Collectors.toSet());
... // Create missing indices and decrement counter

After validation and possible index creation, the coordinating node groups bulk items by ShardId :

Map
> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
    DocWriteRequest request = bulkRequest.requests.get(i);
    if (request == null) continue;
    String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
    ShardId shardId = clusterService.operationRouting()
        .indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
    requestsByShard.computeIfAbsent(shardId, s -> new ArrayList<>())
        .add(new BulkItemRequest(i, request));
}

Each shard‑specific bulk request ( BulkShardRequest ) is then sent to TransportShardBulkAction , which forwards it to the primary shard (or routes to the appropriate node) via the ReroutePhase of TransportReplicationAction . The primary shard executes the operation, updates the translog, and replicates to replicas.

Summary

The coordinating node in Elasticsearch orchestrates the write path by validating requests, applying ingest pipelines, optionally creating indices, routing requests to the correct primary shard, and merging shard‑level responses. Understanding this phase helps developers tune bulk indexing performance, configure dedicated coordinating nodes, and avoid resource contention in large clusters.

References

[1] https://blog.csdn.net/a19860903/article/details/72467996 [2] https://blog.csdn.net/weixin_36564655/article/details/82736327 [3] https://www.elastic.co/guide/cn/elasticsearch/guide/current/distrib-write.html [4] http://developer.51cto.com/art/201904/594615.htm

ElasticsearchClusterBulk RequestCoordinating Nodedata-writeShard
Xueersi Online School Tech Team
Written by

Xueersi Online School Tech Team

The Xueersi Online School Tech Team, dedicated to innovating and promoting internet education technology.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.