How Netflix Built a Low‑Latency Distributed Counter Service at Scale

This article explains Netflix's distributed counter abstraction built on their time‑series service, detailing use cases, API design, counter types, implementation methods, control‑plane configuration, performance results, and future work to achieve near‑real‑time, low‑latency counting at massive scale.

JavaEdge
JavaEdge
JavaEdge
How Netflix Built a Low‑Latency Distributed Counter Service at Scale

Introduction

Netflix previously introduced a time‑series abstraction for storing and querying massive event data with millisecond latency. Building on that, they created a distributed counter abstraction that provides similar low‑latency performance while supporting large‑scale counting across multiple use cases.

Use Cases and Requirements

Counting is needed for tracking millions of user interactions, monitoring feature exposure frequencies, and aggregating metrics in A/B tests. Netflix classifies these into two categories:

Best‑effort : Low‑latency, near‑real‑time reads with minimal infrastructure cost; exactness and durability are not required.

Eventually consistent : Accurate, durable counts are needed, accepting slight latency and higher cost.

Both categories share high‑throughput and high‑availability requirements.

Distributed Counter Abstraction

The abstraction lets users choose between best‑effort and eventually consistent modes (and an experimental "accurate" mode) via a configurable API, abstracting away storage details.

API

Counters live in user‑defined namespaces, each configurable with parameters such as counter type, TTL, and base.

Key operations (mirroring Java's AtomicInteger) are:

AddCount / AddAndGetCount : Adjust a counter by a positive or negative delta; the latter returns the new value. Supports an optional idempotency token for safe retries.

{
  "namespace": "my_dataset",
  "counter_name": "counter123",
  "delta": 2,
  "idempotency_token": {
    "token": "some_event_id",
    "generation_time": "2024-10-05T14:48:00Z"
  }
}

GetCount : Retrieve the current count for a counter.

{
  "namespace": "my_dataset",
  "counter_name": "counter123"
}

ClearCount : Reset a counter to zero.

{
  "namespace": "my_dataset",
  "counter_name": "counter456",
  "idempotency_token": { ... }
}

Counter Types

Three main types are supported:

Best‑effort : Implemented with EVCache (a Netflix‑built Memcached‑based cache). It offers extremely low latency and high throughput but lacks cross‑region replication, strong consistency, and native idempotency.

Eventually consistent : Uses a global replicated store (Cassandra) and a roll‑up pipeline to aggregate events while providing durability and stronger accuracy.

Accurate (experimental) : Extends eventual consistency by storing per‑event deltas and computing a real‑time delta on reads.

Implementation Methods for Eventually Consistent Counters

The article explores four approaches and their trade‑offs:

One row per counter : Simple but suffers from lack of idempotency and high write contention.

Per‑instance aggregation : Buffers increments in memory before flushing, but is vulnerable to data loss on instance failure and makes resets difficult.

Persistent queue (Kafka) : Records each increment as an event in a durable queue, enabling reliable replay and windowed aggregation, yet introduces latency and rebalancing complexity.

Event‑log per increment : Stores every increment with timestamps and IDs, allowing precise reconstruction but causing read latency, duplicate work, wide partitions, and storage growth.

Netflix combines these ideas: each increment is written to a time‑series event store (Cassandra), a lightweight roll‑up event is sent to a roll‑up server, and background aggregation pipelines continuously converge counts.

Roll‑up Pipeline

Roll‑up servers consume {"namespace":"my_dataset","counter":"counter123"} events, aggregate increments over immutable windows, and write the results to a persistent roll‑up table. The pipeline uses in‑memory queues, dynamic batch sizing, adaptive back‑pressure, and hash‑based coalescing (e.g., XXHash) to minimize duplicate work and maintain high throughput.

Control Plane Configuration

All abstractions are configured via a data‑gateway control plane. Example JSON shows namespace settings for cache (EVCache), roll‑up storage (Cassandra), and event storage (Cassandra) with parameters such as queue count, batch size, time‑partitioning, retention policies, and accept limits.

Performance

At the time of writing, the service handled roughly 75 K count requests per second across global endpoints, delivering single‑digit millisecond latency for reads.

Future Work

Regional roll‑up tables to improve cross‑region replication.

Enhanced error detection and handling for stale counts, especially for low‑traffic counters.

Conclusion

Distributed counting remains a challenging problem. Netflix’s solution balances low infrastructure cost, high availability, low latency, and idempotent guarantees by offering configurable counter modes and a robust roll‑up pipeline. The design illustrates practical trade‑offs required to meet diverse counting needs at massive scale.

distributed-systemsbackend architecturelow-latencyEventual ConsistencyNetflixcounter service
JavaEdge
Written by

JavaEdge

First‑line development experience at multiple leading tech firms; now a software architect at a Shanghai state‑owned enterprise and founder of Programming Yanxuan. Nearly 300k followers online; expertise in distributed system design, AIGC application development, and quantitative finance investing.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.