Skip to content

PATTERN Cited by 1 source

Lazy aggregate from monotonic local state

Pattern

When you need a clusterwide value that depends on the state of every shard — typically a watermark like "safe to delete up to here" or "all shards have processed events through here"have each shard publish its monotonic local watermark M(p) via an existing periodic metadata-dissemination substrate, and have each consumer compute the global aggregate (typically min or max) over whatever observations it currently has. Because the local watermarks are monotonic and the aggregate function is order-independent, stale observations always produce a conservative-safe result — the aggregate may be older than the true cluster state, but it is never wrong.

This eliminates the need for a coordination protocol, leader- elected aggregator, or strongly-consistent global state for the aggregate. The aggregate is lazy in two senses: (a) it is recomputed by each consumer from local observations rather than pushed from a central authority, and (b) it tolerates arbitrary dissemination latency without sacrificing correctness.

Canonicalised by Redpanda Cloud Topics for L0 GC's clusterwide safe-to-delete epoch:

"Now that every Cloud Topic partition p tracks an inactive epoch in its own Raft log, all that's left is to combine these into a single global M. Turns out we can piggyback this information on an existing periodic metadata-dissemination service internal to Redpanda. […] If a node is temporarily operating on stale metadata, that's fine. A nice side effect of epoch monotonicity is that once we prove some M is safe, it never becomes unsafe." (Source: sources/2026-05-19-redpanda-cloud-topics-level-zero-garbage-collection)

Mechanics

Per-shard (publishers):

   Shard p:
     M(p) = local_monotonic_watermark()    ← see e.g.
                                              [concepts/sliding-window-epoch-tracking](<../concepts/sliding-window-epoch-tracking.md>)
     publish(M(p))                          ← via existing periodic
                                              metadata-dissemination

Per-consumer (any node that wants to act on the aggregate):

   M = min over p of last_observed(M(p))    ← lazy: use whatever's
                                              been disseminated so far
   act(M)                                    ← use the aggregate

Property: if a consumer's view of M(p) is stale, the resulting M
is smaller (more conservative) than the true M, never larger.

The aggregate function — typically min for safe-to-GC watermarks, max for "earliest activity" — must satisfy:

  1. Order-independence: applying the aggregate over {a, b, c} in any order produces the same result.
  2. Monotonicity-friendliness: if all inputs only ever increase, the aggregate also only ever increases.

min and max over monotonically non-decreasing inputs satisfy both. (min of monotonically non-decreasing inputs is itself monotonically non-decreasing.)

Why staleness is safe

The pattern relies on a specific structural property: a stale observation of a monotonic local watermark is a strictly conservative under-estimate of the true value.

For a min-aggregate over watermarks where "larger means more permissive" (e.g. "safe to delete more"):

  • True value: M_true = min(M(p)_true) over all shards p.
  • Observed value: M_obs = min(M(p)_obs) where some M(p)_obs ≤ M(p)_true due to dissemination lag.
  • Therefore M_obs ≤ M_true.

A consumer acting on M_obs is always doing less than they could under perfect information — never more. For GC specifically:

  • Stale observation → consumer deletes fewer objects than they could (some still-eligible objects survive longer).
  • Never the other way around → consumer never deletes an ineligible object.

The Cloud Topics post's framing: "once we prove some M is safe, it never becomes unsafe. Every epoch < M is gone forever. Or until int64 rollover."

Why piggybacking on existing dissemination matters

The pattern's operational pay-off comes from not adding a new coordination mechanism:

  • No new gossip protocol — reuse the existing one.
  • No new control-plane service — the aggregate is computed at the edge.
  • No new RPC topology — watermarks ride existing metadata channels.
  • No new failure modes — the dissemination substrate's existing failure modes are the ones the aggregate inherits.

The Cloud Topics instance illustrates: the per-partition M(p) piggybacks on Redpanda's existing internal metadata-dissemination service. The GC system adds zero new operational components.

Comparison with strongly-consistent aggregation

Property Lazy aggregate from monotonic state Strongly-consistent global state (e.g. consensus group, ZK)
Correctness under stale data Safe (conservative under-estimate) Wrong if not refreshed
Latency to act Immediately Wait for consensus / read
Coordination cost None (compute at edge) Per update + per read
Failure modes Dissemination's existing New consensus failure modes
Granularity Whatever dissemination publishes Per update
Suitable for Watermarks, GC, "safe-to-X" checks Mutual exclusion, leader election

The pattern is only correct when the aggregate is over monotonic state and stale observations are conservative-safe. It is wrong for non-monotonic or non-conservative aggregations (e.g. "current load on each shard" — a stale load reading is not conservatively safe).

Composition with sibling patterns

In the Cloud Topics canonical instance, this pattern is the third of three composed primitives in epoch-based distributed GC:

  1. patterns/epoch-stamp-on-object-id-for-gc — stamp the epoch on each object at creation.
  2. patterns/per-partition-rsm-for-gc-tracking — track per- shard M(p) in a Raft-replicated state machine.
  3. This pattern — disseminate the per-shard watermarks and compute global M lazily at each consumer.

Each primitive is independently usable; together they deliver "no central index, no shared state, and no coordinated updates."

When it does and doesn't apply

Applies when:

  1. Aggregate is over monotonic local state. Watermarks, high-water-marks, low-water-marks, "processed-everything- below" markers, GC safety bounds.
  2. Stale → conservative. The acting consumer's behaviour under stale data is strictly less than under fresh data, in a correctness-preserving direction.
  3. Existing dissemination substrate. Gossip, periodic broadcast, controller-driven push — anything that delivers updates eventually.
  4. Lag tolerance acceptable. The system tolerates the dissemination latency as a delay-to-effect, not a correctness issue.

Does not apply when:

  • Aggregate is over non-monotonic state. "Current QPS per shard" — stale readings are not conservative-safe.
  • Strong consistency is required. "Acquire a global lock" cannot be lazy-aggregated.
  • Fresh-by-construction is required. "Latest version of this resource" needs a different mechanism (e.g. a resource-version check at read time).
  • Aggregate function is not order-independent. Median, p99, ranked operations can be lazy-disseminated, but care is needed.

Anti-patterns

  • Synchronous global aggregation (poll all shards on every use). Defeats the laziness; couples consumer latency to all shards' availability.
  • Cache the aggregate in a central authority. Reintroduces the central index and coordinated updates the pattern is designed to avoid.
  • Treat stale data as an error. Stale data is the normal case here; the pattern's correctness comes from this being fine.
  • Apply to non-monotonic state. Breaks the conservative-safe property; consumers can act on incorrect aggregate values.

Failure-mode behaviour

  • Dissemination loses an update. Consumer's M_obs is older than it should be. Acting consumer does less work than optimum. Eventual recovery: next dissemination cycle catches up. Correctness is unharmed.
  • Shard fails to publish for an extended period. Its contribution to the min is whatever was last seen. If the shard is down, the aggregate is stuck behind its last watermark. This is operationally noticeable but correctness-safe — the cluster reclaims less aggressively while the shard is down.
  • Network partition splits cluster. Each side computes its own aggregate from the watermarks visible to it. Both sides' aggregates are correct (under-estimates of their respective views), and converge once the partition heals.

Generalisations beyond GC

While the canonical instance is L0 GC, the pattern generalises to any "global watermark from per-shard state" shape:

  • Earliest active transaction across replicasmin(p) of oldest transaction. Used for MVCC garbage collection, Postgres vacuum.
  • Stream-processing low-water-markmin over all source partitions' event-time progress. Used for window closing.
  • Distributed checkpointingmin over all participants' "committed up to" offsets. Used for recovery.
  • Compaction safe-pointmin over all readers' "earliest snapshot in use". Used for table compaction.

All share the structural shape: per-shard monotonic watermarks + order-independent aggregate + lazy dissemination + conservative- safe staleness.

Seen in

Last updated · 542 distilled / 1,571 read