Skip to content

SYSTEM Cited by 1 source

Netflix Distributed Counter Abstraction

Netflix Distributed Counter Abstraction is the counting service built on top of the TimeSeries Abstraction and deployed via the Data Gateway Control Plane. It is the third mature abstraction service on the Data Gateway platform, after KV DAL and TimeSeries. Processing ~75,000 count requests/second globally at single-digit millisecond latency at the time of the 2024-11-13 post. (Source: sources/2024-11-13-netflix-netflixs-distributed-counter-abstraction)

API

Modeled on Java's AtomicInteger.

  • AddCount / AddAndGetCount — adjust by signed delta; carry IdempotencyToken (event_time, nonce) so retries + hedges are safe on supported counter types.
  • GetCount — return current value.
  • ClearCount — reset to 0; idempotent via same token mechanism.

All operations are scoped to a namespace + counter_name. Namespaces are the unit of configuration — counter type, TTL, cardinality, rollup + store + cache bindings — and are tuned via the Data Gateway control plane.

Counter types

Best-Effort Regional

Thin wrapper over EVCache incr/decr with an optional TTL; ClearCount maps to cache.delete(key, ReplicaPolicy.ALL). High throughput + low-millisecond latency within a single region. No cross-region replication, no consistency guarantees, no native idempotency — retries are unsafe. Suited for short-lived A/B experiments where approximate counts are sufficient.

Eventually Consistent Global

Event-log + background rollup architecture (see below). Accurate, durable, globally available, idempotent. Default for use cases that need real counts.

Experimental Accurate Global

Same as Eventually Consistent, but GetCount computes a real-time delta on top of the rollup checkpoint:

currentAccurateCount = lastRollupCount + delta(lastRollupTs, now())

Effective when the counter is accessed frequently enough that the delta window stays narrow. Flagged as experimental in the post.

Eventually-Consistent architecture

Write path (AddCount / ClearCount):

  1. Durably persist the event to TimeSeries with the client-supplied IdempotencyToken. Retries with the same token deduplicate on the (event_time, event_id, event_item_key) composite key.
  2. Write last-write-timestamp = event_time onto the Rollup Store row using Cassandra's USING TIMESTAMP (LWW) for predictable ordering.
  3. Fire a light-weight rollup event ({namespace, counter}, no delta) to the rollup tier. See patterns/fire-and-forget-rollup-trigger.

Read path (GetCount):

  1. Return cached lastRollupCount from the EVCache Rollup Cache as a point-read — accepts a few seconds of staleness.
  2. Trigger a rollup event so the checkpoint advances for the next read, and so a previously-missed rollup self-heals.

Background rollup pipeline (Counter-Rollup servers):

  • Rollup events land in in-memory queues per instance. XXHash routes the same counter to the same queue so dedup works locally.
  • Queues drain into Sets to coalesce duplicate events in a rollup window — a counter is aggregated at most once per window.
  • Each rollup consumer pulls a batch of counters and queries TimeSeries in parallel for events within the immutable window (older than now - acceptLimit).
  • Delta is computed by summing event deltas; new lastRollupCount
  • lastRollupTs are written to the Rollup Store and cached in EVCache as a single combined value (avoids count / timestamp mismatch).
  • Adaptive back-pressure between batches prevents the rollup tier from overwhelming the underlying TimeSeries store — each consumer waits for its current batch before issuing the next one and tunes the wait based on prior-batch performance.
  • Low-cardinality counters stay in constant rollup circulation so they don't fall behind and force scans of many time partitions on their next read. High-cardinality counters drain out once last-write-timestamp stops advancing, preventing rollup-queue memory blowup. The last-write-timestamp column on the Rollup Store is the discriminator.

Concurrency safety: aggregation happens inside the immutable window so multiple Rollup server instances can overwrite each other freely — the window by construction is no longer receiving events, so all observers converge to the same value. Netflix does not use distributed locking; it tolerates fluctuating intermediate values in exchange for availability.

Control plane config

Per-namespace config names all three layers (rollup / event store / cache) with a shared scope + physical-cluster naming:

persistence_configuration:
  - id: CACHE
    scope: dal=counter
    physical_storage: { type: EVCACHE, cluster: evcache_dgw_counter_tier1 }
  - id: COUNTER_ROLLUP
    scope: dal=counter
    physical_storage: { type: CASSANDRA, cluster: cass_dgw_counter_uc1, dataset: my_dataset_1 }
    counter_cardinality: LOW
    config:
      counter_type: EVENTUAL
      eventual_counter_config:
        internal_config:
          queue_config: { num_queues: 8, coalesce_ms: 10000, capacity_bytes: 16777216 }
          rollup_batch_count: 32
  - id: EVENT_STORAGE
    scope: dal=ts
    physical_storage: { type: CASSANDRA, cluster: cass_dgw_counter_uc1, dataset: my_dataset_1 }
    config:
      time_partition: { buckets_per_id: 4, seconds_per_bucket: 600, seconds_per_slice: 86400 }
      accept_limit: 5s
    lifecycleConfigs:
      lifecycleConfig:
        - type: retention
          config: { close_after: 518400s, delete_after: 604800s }

Queues can be rebalanced by flipping num_queues + redeploy. Graceful shutdown drains existing events; the immutable-window property means brief overlap of old + new servers is safe.

Performance

  • ~75K count requests/second globally across all endpoints at the time of writing.
  • Single-digit millisecond latency across endpoints.
  • No per-endpoint QPS or latency breakdown disclosed.

Future work (disclosed)

  • Regional rollup tables + a global reconciliation table to mitigate cross-region replication drift. Key unresolved challenge named by Netflix: communicating counter clears across regions.
  • Durable rollup queues + rollup handoffs + improved error detection for infrequently-accessed counters where a lost rollup event isn't self-healed by a subsequent access.

Rejected alternatives (worth knowing)

Netflix walks through and rejects:

  • Single row per counter + CAS / locks — heavy contention under high write concurrency; no native idempotency; secondary-key bucketing complicates hot-key management.
  • Per-instance in-memory aggregation with periodic flush — vulnerable to data loss on crash/restart; can't reliably reset counts; no native idempotency without leader election.
  • Durable queue (Kafka) + stream-processor windowed aggregation — potential consumer-partition backup; requires partition rebalancing as cardinality scales; pre-aggregation makes audit + recounting infeasible.
  • Event log of individual increments (simplest form) — each read scans all increments (slow); duplicate aggregation work across readers; wide Cassandra partitions; large data footprint.

Netflix's combined design keeps the event-log auditing property while amortising aggregation cost through the background rollup pipeline.

  • Probabilistic data structures (HyperLogLog, Count-Min Sketch) are named and rejected: HLL solves distinct-count, not increment/decrement per key; CMS would require primitives for per-key reset + TTL that their chosen data stores already provide natively.

Seen in

Last updated · 319 distilled / 1,201 read