Skip to content

NETFLIX 2024-11-13

Read original ↗

Netflix — Netflix's Distributed Counter Abstraction

Summary

Netflix introduces the Distributed Counter Abstraction — a counting service built on top of the TimeSeries Abstraction and deployed via the Data Gateway Control Plane. It serves ~75K count requests/second globally across endpoints at single-digit millisecond latency. Counters are organized into namespaces, each configurable for counter type (Best-Effort / Eventually Consistent / experimental Accurate), TTL, and cardinality. The API resembles Java's AtomicInteger: AddCount / AddAndGetCount / GetCount / ClearCount, with mutations carrying an IdempotencyToken (event_time + nonce) so retries + hedges are safe. Best-Effort counters are a thin wrapper over EVCache's incr/decr/delete(ALL) — high throughput, low latency, no cross-region replication, no consistency, no native idempotency. Eventually Consistent counters are the load-bearing design: Netflix walks through four rejected approaches (single row + CAS, per-instance aggregation, durable queue, raw event log) and lands on a combined event-log + background sliding-window rollup architecture, with a bucketing strategy to prevent wide partitions. The TimeSeries Abstraction (Cassandra-backed) stores each counter mutation as an event with (event_time, event_id, event_item_key) forming a natural idempotency key, ordered descending for efficient reset handling, with retention policies for cost. A background rollup pipeline runs per-namespace: every read + every write sends a light-weight rollup event (namespace + counter, no delta) to in-memory in-process queues on Counter-Rollup servers; XXHash routes the same counter to the same queue; Sets coalesce duplicate events in a rollup window; batches process N counters in parallel with adaptive back-pressure between batches; rollup only aggregates events in the immutable aggregation window (lagged behind now by a safe margin governed by TimeSeries's acceptLimit); the lastRollupTs checkpoint + cached lastRollupCount live in the Rollup Store (one Cassandra table per dataset) with an EVCache-backed rollup cache combining lastRollupCount + lastRollupTs in a single value to prevent mismatch. Writes are acknowledged when durably persisted to TimeSeries; a fire-and-forget rollup trigger is then issued. Reads return the last-rolled-up count (accepting a few seconds of staleness) and also trigger a rollup to self-heal. The separate last-write-timestamp on the Rollup Store is written using Cassandra's USING TIMESTAMP for Last-Write-Win semantics, and is used to keep low-cardinality counters in constant rollup circulation while draining high-cardinality ones once they've caught up. Horizontal scaling of the Rollup tier allows over-writes within the immutable window (safe by construction). The Experimental Accurate counter adds a real-time delta currentRolledUpCount = lastRollupCount + delta_since_lastRollupTs, effective when the counter is actively accessed so the delta window stays narrow. Named future work: regional rollup tables + global reconciliation (to mitigate cross-region replication drift), plus durable rollup queues + handoffs + better error detection for infrequently accessed counters.

Key takeaways

  1. Netflix's Counter Abstraction is the third mature service on the Data Gateway platform (after KV DAL and TimeSeries Abstraction). It reuses TimeSeries as its event store and EVCache as its rollup cache, demonstrating the platform's composition property: "we compose multiple abstraction layers using containers deployed on the same host, with each container fetching configuration specific to its scope." (Source: sources/2024-11-13-netflix-netflixs-distributed-counter-abstraction)
  2. Two-mode taxonomy for counters is load-bearing: Best-Effort vs. Eventually Consistent. Best-Effort is EVCache-only — accepts no idempotency, no durability, no cross-region replication in exchange for "extremely high throughput at low millisecond latency." Eventually Consistent is the default "accurate and durable" mode that most Netflix use cases need.
  3. Event log + background sliding-window rollup is Netflix's chosen primitive for distributed counting. Each mutation is persisted as an event keyed by (event_time, event_id, event_item_key). A background pipeline continuously aggregates events within an immutable window (lagged by a safe margin behind now), checkpoints the result as (lastRollupCount, lastRollupTs), and the next rollup continues from that checkpoint. Auditing and recounting are preserved because individual increments are retained in the event store until the retention TTL expires.
  4. Probabilistic data structures (HLL, Count-Min Sketch) are explicitly rejected — HLL is a distinct-element counter (wrong problem shape for increment/decrement of a given key), and CMS would have required extending Netflix's primitives to support resets and TTLs per-key. Netflix chose to build on data stores they already operate at scale.
  5. Idempotency is baked into the event's natural composite key: (event_time, event_id, event_item_key) deduplicates retries and hedged writes. The same AddCount call with the same token produces the same row — writes are safe to retry.
  6. Immutable aggregation window is the trick that makes rollup safe under concurrent writes. Rollup only aggregates events older than a safe margin — the acceptLimit parameter of TimeSeries rejects incoming events with timestamps beyond that limit, so the window is by construction no longer receiving events. Aggregation can therefore run in parallel, from multiple Rollup instances, without distributed locking: "although the concept of now() may differ between threads, causing rollup values to sometimes fluctuate, the counts will eventually converge to an accurate value within each immutable aggregation window."
  7. Light-weight rollup events decouple the write path from the aggregation path. The rollup event is just {namespace, counter} — it carries no delta. This means the rollup server knows which counters need aggregation without having to scan the entire event store; it doesn't have to know how much each counter changed. Delta computation happens inside the batch job reading from TimeSeries.
  8. Bucketed event-time partitioning prevents wide partitions in Cassandra under high-throughput counter events. The TimeSeries schema uses (time_bucket, event_bucket) columns to break up otherwise-hot per-counter partitions. Control plane tunes seconds_per_bucket + buckets_per_id per namespace based on counter cardinality.
  9. Cassandra USING TIMESTAMP gives predictable Last-Write-Win semantics for the last-write-timestamp column on the Rollup Store. The last-write-timestamp is set equal to the event's event_time, and is the mechanism by which the Rollup server decides whether a counter should stay in rollup circulation (its pending writes haven't caught up) or drain out.
  10. Performance envelope: ~75K count requests/second globally; single-digit-millisecond latency across all endpoints. No per-endpoint QPS or latency breakdown disclosed.

Systems extracted

Concepts extracted

Patterns extracted

Operational numbers

  • ~75,000 count requests/second globally across all endpoints at the time of writing.
  • Single-digit millisecond latency for all endpoints.
  • Rollup-queue memory: 16,777,216 bytes (16 MiB) per queue in the Low-cardinality example config.
  • Coalesce window: 10,000 ms (10 s) for rollup-event dedup per queue.
  • Rollup batch size: 32 counters per parallelization batch in the example config.
  • Time-partition shape (low-cardinality example): 4 event buckets per counter, 600 s (10 min) per bucket, 86,400 s (1 day) per slice.
  • TimeSeries acceptLimit: 5 s in the example — the upper bound on event-time skew, beyond which events are rejected. This doubles as the lag the rollup pipeline respects before treating a window as immutable.
  • TimeSeries retention (example): close_after: 518,400 s (6 days); delete_after: 604,800 s (7 days).

Caveats

  • Cross-region replication can miss events. The post explicitly acknowledges "this approach does leave the door open for missed events due to cross-region replication issues." Named future work is a per-region rollup table + a global reconciliation table with a "key challenge" of communicating clearing across regions.
  • Staleness of at most a few seconds for reads is intrinsic — reads return the last-rolled-up count, and the rollup lags by the safe margin. Use cases needing sub-second accuracy must use the experimental Accurate mode (which adds real-time delta computation on top of the rollup checkpoint).
  • Rollup events are in-memory queues on the Rollup server. Instance crashes can lose events — "This comes with the trade-off of potentially missing rollup events in case of an instance crash." Future work names durable queues as the fix.
  • Infrequently accessed counters can stay stale longer. If a rollup trigger is lost + no subsequent access happens, the counter's stored rollup stays out of date until the next read triggers a refresh. Future work names improved error detection
  • rollup handoffs.
  • Low-cardinality vs. high-cardinality counters have opposite rollup-circulation needs. Low-cardinality stays in constant circulation to avoid scanning too many time-partitions on read; high-cardinality must drain out once caught up to avoid excessive rollup-queue memory pressure. last-write-timestamp is the discriminator.
  • No per-endpoint latency breakdown, no region-by-region QPS, no per-namespace tuning case study. The post is an architectural-design overview, not a production retrospective.
  • "Accurate" counter is experimental"take the term 'Accurate' with a grain of salt". Not positioned as production- hardened.
  • Probabilistic alternatives (HLL, Count-Min Sketch) named and rejected — HLL counts distinct elements (wrong problem shape); CMS would need additional primitives for per-key resets + TTLs; both would require adding a new data store to Netflix's platform.
  • Best-Effort does NOT have idempotency. EVCache incr/decr are racy under retries. The post names this explicitly — Best-Effort trades idempotency for throughput and latency.

Source

Last updated · 319 distilled / 1,201 read