Netflix — Netflix's Distributed Counter Abstraction¶
Summary¶
Netflix introduces the Distributed Counter Abstraction — a
counting service built on top of the
TimeSeries Abstraction
and deployed via the
Data Gateway Control Plane. It
serves ~75K count requests/second globally across endpoints at
single-digit millisecond latency. Counters are organized into
namespaces, each configurable for counter type (Best-Effort /
Eventually Consistent / experimental Accurate), TTL, and
cardinality. The API resembles Java's AtomicInteger:
AddCount / AddAndGetCount / GetCount / ClearCount, with
mutations carrying an IdempotencyToken (event_time + nonce) so
retries + hedges
are safe. Best-Effort counters are a thin wrapper over
EVCache's incr/decr/delete(ALL) — high
throughput, low latency, no cross-region replication, no
consistency, no native idempotency. Eventually Consistent
counters are the load-bearing design: Netflix walks through four
rejected approaches (single row + CAS, per-instance aggregation,
durable queue, raw event log) and lands on a combined event-log +
background sliding-window rollup architecture, with a bucketing
strategy to prevent wide
partitions. The TimeSeries Abstraction (Cassandra-backed)
stores each counter mutation as an event with
(event_time, event_id, event_item_key) forming a natural
idempotency key, ordered descending for efficient reset handling,
with retention policies for cost. A background rollup pipeline
runs per-namespace: every read + every write sends a light-weight
rollup event (namespace + counter, no delta) to in-memory
in-process queues on Counter-Rollup servers;
XXHash routes the same counter to the same
queue; Sets coalesce duplicate events in a rollup window; batches
process N counters in parallel with adaptive back-pressure between
batches; rollup only aggregates events in the immutable
aggregation window (lagged behind now by a safe margin governed
by TimeSeries's acceptLimit); the lastRollupTs checkpoint +
cached lastRollupCount live in the Rollup Store (one Cassandra
table per dataset) with an EVCache-backed rollup cache combining
lastRollupCount + lastRollupTs in a single value to prevent
mismatch. Writes are acknowledged when durably persisted to
TimeSeries; a fire-and-forget rollup trigger is then issued. Reads
return the last-rolled-up count (accepting a few seconds of
staleness) and also trigger a rollup to self-heal. The separate
last-write-timestamp on the Rollup Store is written using
Cassandra's USING TIMESTAMP for Last-Write-Win semantics, and is
used to keep low-cardinality counters in constant rollup circulation
while draining high-cardinality ones once they've caught up.
Horizontal scaling of the Rollup tier allows over-writes within the
immutable window (safe by construction). The Experimental
Accurate counter adds a real-time delta currentRolledUpCount =
lastRollupCount + delta_since_lastRollupTs, effective when the
counter is actively accessed so the delta window stays narrow.
Named future work: regional rollup tables + global reconciliation
(to mitigate cross-region replication drift), plus durable rollup
queues + handoffs + better error detection for infrequently accessed
counters.
Key takeaways¶
- Netflix's Counter Abstraction is the third mature service on the Data Gateway platform (after KV DAL and TimeSeries Abstraction). It reuses TimeSeries as its event store and EVCache as its rollup cache, demonstrating the platform's composition property: "we compose multiple abstraction layers using containers deployed on the same host, with each container fetching configuration specific to its scope." (Source: sources/2024-11-13-netflix-netflixs-distributed-counter-abstraction)
- Two-mode taxonomy for counters is load-bearing: Best-Effort vs. Eventually Consistent. Best-Effort is EVCache-only — accepts no idempotency, no durability, no cross-region replication in exchange for "extremely high throughput at low millisecond latency." Eventually Consistent is the default "accurate and durable" mode that most Netflix use cases need.
- Event log + background sliding-window rollup is Netflix's
chosen primitive for distributed counting. Each mutation is
persisted as an event keyed by
(event_time, event_id, event_item_key). A background pipeline continuously aggregates events within an immutable window (lagged by a safe margin behindnow), checkpoints the result as(lastRollupCount, lastRollupTs), and the next rollup continues from that checkpoint. Auditing and recounting are preserved because individual increments are retained in the event store until the retention TTL expires. - Probabilistic data structures (HLL, Count-Min Sketch) are explicitly rejected — HLL is a distinct-element counter (wrong problem shape for increment/decrement of a given key), and CMS would have required extending Netflix's primitives to support resets and TTLs per-key. Netflix chose to build on data stores they already operate at scale.
- Idempotency is baked into the event's natural composite key:
(event_time, event_id, event_item_key)deduplicates retries and hedged writes. The sameAddCountcall with the same token produces the same row — writes are safe to retry. - Immutable aggregation window is the trick that makes rollup
safe under concurrent writes. Rollup only aggregates events
older than a safe margin — the
acceptLimitparameter of TimeSeries rejects incoming events with timestamps beyond that limit, so the window is by construction no longer receiving events. Aggregation can therefore run in parallel, from multiple Rollup instances, without distributed locking: "although the concept ofnow()may differ between threads, causing rollup values to sometimes fluctuate, the counts will eventually converge to an accurate value within each immutable aggregation window." - Light-weight rollup events decouple the write path from the
aggregation path. The rollup event is just
{namespace, counter}— it carries no delta. This means the rollup server knows which counters need aggregation without having to scan the entire event store; it doesn't have to know how much each counter changed. Delta computation happens inside the batch job reading from TimeSeries. - Bucketed event-time partitioning prevents
wide partitions in
Cassandra under high-throughput counter events. The TimeSeries
schema uses
(time_bucket, event_bucket)columns to break up otherwise-hot per-counter partitions. Control plane tunesseconds_per_bucket+buckets_per_idper namespace based on counter cardinality. - Cassandra
USING TIMESTAMPgives predictable Last-Write-Win semantics for thelast-write-timestampcolumn on the Rollup Store. The last-write-timestamp is set equal to the event'sevent_time, and is the mechanism by which the Rollup server decides whether a counter should stay in rollup circulation (its pending writes haven't caught up) or drain out. - Performance envelope: ~75K count requests/second globally; single-digit-millisecond latency across all endpoints. No per-endpoint QPS or latency breakdown disclosed.
Systems extracted¶
- systems/netflix-distributed-counter — the new Counter abstraction service, the focus of the post.
- systems/netflix-timeseries-abstraction — event store for counter mutations; previously named only in the 2024-09-19 KV DAL post, canonicalised here as a first-class system.
- systems/netflix-data-gateway — control plane hosting the Counter service alongside KV and TimeSeries.
- systems/evcache — Best-Effort counter backing + Rollup Cache.
- systems/apache-cassandra — persistent store underneath the
TimeSeries event log and the Rollup Store;
USING TIMESTAMPgives Last-Write-Win semantics. - systems/netflix-kv-dal — sibling abstraction on the same Data Gateway platform (named in passing).
Concepts extracted¶
- concepts/best-effort-vs-eventually-consistent-counter — the two-mode taxonomy Netflix surfaces.
- concepts/immutable-aggregation-window — the concurrency-safety trick underneath the rollup pipeline.
- concepts/event-log-based-counter — counter as an event log aggregated in the background rather than an in-place counter.
- concepts/lightweight-rollup-event — signaling-only event (namespace + counter, no delta) that tells the rollup server a counter needs attention.
- concepts/idempotency-token (extended) — Counter's
(event_time, event_id)composite token as a second canonical Netflix instance after KV DAL. - concepts/last-write-wins (extended) — Cassandra's
USING TIMESTAMPused operationally to enforce LWW on the last-write-timestamp column. - concepts/tail-latency-at-scale (extended) — hedged-request framing carried over from the Google "Tail at Scale" paper as motivation for idempotent writes.
- concepts/wide-partition-problem (extended) — the Cassandra physics driving the time-bucket + event-bucket schema.
Patterns extracted¶
- patterns/sliding-window-rollup-aggregation — background aggregation of an event log within a rolling immutable window, with checkpointed state in a rollup store.
- patterns/bucketed-event-time-partitioning — schema-level
decomposition of per-key time-partitions into
(time_bucket, event_bucket)to prevent wide partitions. - patterns/fire-and-forget-rollup-trigger — writes issue a best-effort asynchronous signal to the rollup tier rather than synchronously waiting for aggregation.
- patterns/data-abstraction-layer (extended) — Counter is the third mature DAL on the Data Gateway.
- patterns/namespace-backed-storage-routing (extended) — Counter extends the namespace primitive across the Counter Abstraction + TimeSeries + Rollup Store + EVCache compose.
Operational numbers¶
- ~75,000 count requests/second globally across all endpoints at the time of writing.
- Single-digit millisecond latency for all endpoints.
- Rollup-queue memory:
16,777,216 bytes(16 MiB) per queue in the Low-cardinality example config. - Coalesce window:
10,000 ms(10 s) for rollup-event dedup per queue. - Rollup batch size: 32 counters per parallelization batch in the example config.
- Time-partition shape (low-cardinality example):
4event buckets per counter,600 s(10 min) per bucket,86,400 s(1 day) per slice. - TimeSeries
acceptLimit:5 sin the example — the upper bound on event-time skew, beyond which events are rejected. This doubles as the lag the rollup pipeline respects before treating a window as immutable. - TimeSeries retention (example):
close_after: 518,400 s(6 days);delete_after: 604,800 s(7 days).
Caveats¶
- Cross-region replication can miss events. The post explicitly acknowledges "this approach does leave the door open for missed events due to cross-region replication issues." Named future work is a per-region rollup table + a global reconciliation table with a "key challenge" of communicating clearing across regions.
- Staleness of at most a few seconds for reads is intrinsic — reads return the last-rolled-up count, and the rollup lags by the safe margin. Use cases needing sub-second accuracy must use the experimental Accurate mode (which adds real-time delta computation on top of the rollup checkpoint).
- Rollup events are in-memory queues on the Rollup server. Instance crashes can lose events — "This comes with the trade-off of potentially missing rollup events in case of an instance crash." Future work names durable queues as the fix.
- Infrequently accessed counters can stay stale longer. If a rollup trigger is lost + no subsequent access happens, the counter's stored rollup stays out of date until the next read triggers a refresh. Future work names improved error detection
- rollup handoffs.
- Low-cardinality vs. high-cardinality counters have opposite
rollup-circulation needs. Low-cardinality stays in constant
circulation to avoid scanning too many time-partitions on read;
high-cardinality must drain out once caught up to avoid
excessive rollup-queue memory pressure.
last-write-timestampis the discriminator. - No per-endpoint latency breakdown, no region-by-region QPS, no per-namespace tuning case study. The post is an architectural-design overview, not a production retrospective.
- "Accurate" counter is experimental — "take the term 'Accurate' with a grain of salt". Not positioned as production- hardened.
- Probabilistic alternatives (HLL, Count-Min Sketch) named and rejected — HLL counts distinct elements (wrong problem shape); CMS would need additional primitives for per-key resets + TTLs; both would require adding a new data store to Netflix's platform.
- Best-Effort does NOT have idempotency. EVCache
incr/decrare racy under retries. The post names this explicitly — Best-Effort trades idempotency for throughput and latency.
Source¶
- Original: https://netflixtechblog.com/netflixs-distributed-counter-abstraction-8d0c45eb66b2
- Raw markdown:
raw/netflix/2024-11-13-netflixs-distributed-counter-abstraction-abd53a3f.md
Related¶
- companies/netflix
- systems/netflix-distributed-counter
- systems/netflix-timeseries-abstraction
- systems/netflix-data-gateway
- systems/netflix-kv-dal
- systems/evcache
- systems/apache-cassandra
- concepts/best-effort-vs-eventually-consistent-counter
- concepts/immutable-aggregation-window
- concepts/event-log-based-counter
- concepts/lightweight-rollup-event
- concepts/idempotency-token
- concepts/last-write-wins
- concepts/tail-latency-at-scale
- patterns/sliding-window-rollup-aggregation
- patterns/bucketed-event-time-partitioning
- patterns/fire-and-forget-rollup-trigger
- patterns/data-abstraction-layer
- patterns/namespace-backed-storage-routing