Scaling Elasticsearch for Billions of Daily Events: Cluster Planning, Routing & Hot‑Warm Tips
This article explains how to handle a real‑time OLAP monitoring platform processing 10‑12 billion daily events and 400 billion yearly records by optimizing Elasticsearch 5.3.3 through cluster planning, storage strategies, index sharding, compression, hot‑warm architecture, routing, index templates, rollover, and cross‑cluster search, providing concrete configurations and code examples.
Introduction
In a recent project we built an OLAP monitoring platform for risk‑control data that receives a daily peak of 10‑12 billion events, amounting to about 400 billion records per year and roughly 200 TB of storage. Storing and querying such a volume in real time poses a serious challenge.
After extensive research on Elasticsearch and testing insert and aggregation queries on hundreds of billions of documents, we identified several effective solutions.
Cluster planning
Storage strategy
Index sharding
Compression
Hot‑warm partitioning
The Elasticsearch version used is 5.3.3.
1. Cluster Deployment Planning
Elasticsearch clusters have two main node roles: Master Node and Data Node (additional roles such as Tribe Node can be added as needed). Separating read and write responsibilities improves stability and response speed under heavy load.
Master Nodes
Master Nodes manage index metadata, shard allocation, and cluster topology. For large, stable clusters they should be dedicated (node.master: true, node.data: false, node.ingest: false) to keep them out of I/O‑intensive indexing operations.
node.master: true
node.data: false
node.ingest: falseTo avoid split‑brain scenarios configure discovery.zen.minimum_master_nodes as master‑eligible_nodes/2 + 1 (rounded down).
Data Nodes
Data Nodes store the actual index data and handle search, aggregation, and indexing. They should be provisioned with the best CPU, memory, and SSD resources and must not act as Master Nodes.
node.master: false
node.data: true
node.ingest: falseCoordinating‑Only Nodes
These nodes act as load balancers, routing client requests to the appropriate data nodes without storing data themselves. They are configured as:
node.master: false
node.data: false
node.ingest: false
search.remote.connect: falseAdding Coordinating Nodes improves API request throughput, but excessive numbers can add overhead.
2. Routing
Routing determines which shard a document belongs to. By default the document ID is used, but custom routing can dramatically reduce query time by limiting the number of shards involved. shard_num = hash(_routing) % num_primary_shards Example of using routing in requests:
PUT my_index/my_type/1?routing=user1
{
"title": "This is a document"
}
GET my_index/my_type/1?routing=user1When routing is not specified, a query must hit every shard, aggregating results from all of them, which increases latency. Specifying routing narrows the search to one or a few shards.
If data distribution is uneven, you can combine multiple dimensions (e.g., city and user) in the routing value, or create separate indices for high‑volume channels.
Elasticsearch also supports index.routing_partition_size to improve shard balance:
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shardsThe parameter must be > 1 and < index.number_of_shards.
3. Index Splitting
Indexes are horizontally scaled by shards, but a single shard has a hard limit of about 2 billion documents (Lucene limitation). To keep I/O manageable, each node should host no more than three shards.
Time‑based indices are naturally suited for splitting. Elasticsearch provides the Rollover API together with Index Templates to keep each index under a manageable size (e.g., ~100 billion docs, ~5 primary shards).
Index Template
PUT _template/template_1
{
"index_patterns": ["log-*"],
"order": 0,
"settings": {
"number_of_shards": 5
},
"aliases": {
"alias1": {}
}
}Rollover API
PUT /logs-000001
{
"aliases": {"logs_write": {}}
}
POST /logs_write/_rollover
{
"conditions": {
"max_age": "7d",
"max_docs": 1000
}
}When conditions are met, a new index (e.g., logs-000002) is created, the alias is switched, and the old index continues to serve reads.
4. Hot‑Warm Architecture
To maintain low‑latency queries on recent data while storing older data more cheaply, nodes are labeled as hot (high‑performance resources) or warm (slower, larger storage). Shard allocation filtering directs shards to the appropriate node type.
Node Labeling
# start a hot node
./bin/elasticsearch -Enode.attr.box_type=hot
# or in elasticsearch.yml
node.attr.box_type: hot
# start a warm node
./bin/elasticsearch -Enode.attr.box_type=warm
node.attr.box_type: warmHot Template
PUT _template/active-logs
{
"template": "active-logs-*",
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"routing.allocation.include.box_type": "hot",
"routing.allocation.total_shards_per_node": 2
},
"aliases": {"active-logs": {}}
}Warm Template
PUT _template/inactive-logs
{
"template": "inactive-logs-*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"routing.allocation.include.box_type": "warm",
"codec": "best_compression"
}
}After a rollover, the former active index is set to read‑only and its shards are moved to warm nodes:
PUT active-logs-1/_settings
{
"index.blocks.write": true,
"index.routing.allocation.include.box_type": "warm"
}Optionally shrink the warm index to fewer shards and run a forcemerge to reduce segment count:
POST active-logs-1/_shrink/inactive-logs-1
PUT inactive-logs-1/_settings
{ "number_of_replicas": 1 }
PUT inactive-logs-1/_forcemerge
{ "max_num_segments": 1 }Finally delete the original hot index.
5. Additional Optimizations
Index Isolation : Separate large‑traffic indices from smaller ones using shard allocation filtering (e.g., node.attr.zone=zone_a) to prevent resource contention.
Cross‑Data‑Center : Deploy multiple Elasticsearch clusters and use either Tribe Nodes or Cross‑Cluster Search (CCS) to query across clusters.
Tribe Node Example
tribe:
on_conflict: prefer_t1
t1:
cluster.name: us-cluster
discovery.zen.ping.unicast.hosts: ['usm1','usm2','usm3']
t2:
cluster.name: eu-cluster
discovery.zen.ping.unicast.hosts: ['eum1','eum2','eum3']Cross‑Cluster Search Example
PUT /_cluster/settings
{
"persistent": {
"search.remote.cluster_one": {
"seeds": ["127.0.0.1:9300"]
}
}
}
POST /cluster_one:decision,decision/_search
{ "match_all": {} }CCS allows any node to act as a proxy for remote clusters, simplifying configuration and avoiding the need for dedicated tribe nodes.
6. Summary
The article presented a set of techniques—cluster role separation, custom routing, index rollover, hot‑warm node architecture, shard allocation filtering, and cross‑cluster search—to store and query massive time‑series data efficiently. Combining these methods according to business needs can dramatically improve both storage and query performance.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
