Skip to content

PATTERN Cited by 1 source

Local-global aggregation split

Intent

Execute a cross-shard aggregation query by splitting every aggregate function into a local step (pushed down to each shard and run in parallel by the shard-local query engine) and a global step (run at the coordinator / proxy on the merged per-shard results). Ship compact partial aggregates across the network rather than raw rows.

Context

A horizontally-sharded database receives an aggregation query whose predicate doesn't include the shard key (or spans multiple shards). Naively, the coordinator could fetch every row and run the aggregation locally — but that means shipping O(rows) of data across the network, which scales linearly with table size and defeats the purpose of sharding.

The observation that makes this tractable: most SQL aggregate functions are algebraically decomposable — their final result can be computed from per-shard partial results without the coordinator ever seeing the raw rows. COUNT, SUM, MIN, MAX decompose directly; AVG decomposes into two tracked values (sum and count).

Solution

The query planner rewrites every cross-shard aggregation into a two-tier plan:

Local step (per shard, pushed down to MySQL / Postgres / storage engine):

  • Apply shard-local WHERE, GROUP BY, and aggregate functions.
  • Return one row per distinct group-key value per shard, with the partial aggregate values.

Global step (at coordinator / VTGate / router):

  • Merge per-shard partial results.
  • Re-group by the original GROUP BY keys (same-key rows from different shards now coexist).
  • Apply the global form of each aggregate:
  • COUNT(*) → global SUM of per-shard counts.
  • SUM(x) → global SUM.
  • MIN(x) / MAX(x) → global MIN / MAX.
  • AVG(x) → global SUM(SUM) / SUM(COUNT) (carrying both values).
  • COUNT(DISTINCT x) → either exact (ship distinct-value sets) or approximate (merge HyperLogLog sketches).

The plan shape in Vitess (Source: sources/2026-04-21-planetscale-grouping-and-aggregations-on-vitess):

"if the user asked for SELECT count(*) FROM user, Vitess will send down a count(*) to each shard, and then do a SUM on all incoming answers."

Forces

  • Data volume reduction: shard-local partials are typically tiny compared to raw data. 1B raw rows grouped by a 100-distinct-value key ship 100 partials per shard, not 1B rows.
  • Parallelism: local steps run concurrently on every shard, so aggregation latency is ~max per-shard latency rather than sum.
  • MySQL is faster than the coordinator: "we want to delegate as much work as possible to MySQL — it is much faster than Vitess at doing all the database operations" (Source: sources/2026-04-21-planetscale-grouping-and-aggregations-on-vitess). The coordinator typically doesn't have the shard's data locality, indexes, or storage-engine optimisations.
  • Aggregation correctness requires algebraic decomposition: not every aggregate works. Holistic aggregates (median, percentile) don't decompose naively.

Canonical instances

  • Vitess / VTGate — canonicalised in the Source. Every MySQL-pushdown of an aggregate + VTGate-side merge.
  • Apache SparkreduceByKey / combineByKey / aggregateByKey — the RDD abstraction of this pattern.
  • Presto / Trinopartial aggregation + final aggregation stages in the plan.
  • BigQuery / Dremel — leaf-aggregator + root-aggregator (actually a tree of merge tiers for tail-latency management).
  • ClickHouse Distributed tablesGROUP BY with distributed_aggregation_memory_efficient streams partials.
  • Apache Druid — segment-local aggregation + broker merge.
  • MapReduce — the historical archetype: local aggregation = Combiner; global aggregation = Reducer.

Consequences

  • + Horizontally scalable aggregation: adding shards doesn't increase per-query network cost proportionally; the partial-result set per shard stays bounded by groups × shards.
  • + Reuses engine-native aggregation: MySQL's COUNT(*) / GROUP BY runs exactly as it would on an unsharded database, so all MySQL optimisations apply (index-only scans, covering indexes).
  • − Requires planner support: the rewrite must be implemented. Engines without this planner infrastructure fall back to fetch-everything-and-aggregate-locally.
  • − Correctness depends on decomposability: the planner has to know which aggregates decompose and how. Custom user-defined aggregates need explicit combine operators.
  • − Doesn't solve cross-shard joins: aggregations that sit above a non-colocated join require the more complex aggregation-pushdown-under-join extension.

Extensions

  • Aggregation pushdown under join — when the aggregation is above a cross-shard join, push the local aggregation under each side of the join (adding a multiplier column) so the join sees fewer rows.
  • Hierarchical aggregation — for very large shard counts, do partial → intermediate → final on a tree (like Dremel / BigQuery) rather than partial → final on a star topology, to avoid coordinator becoming a bottleneck.
  • Streaming partial merge — coordinator consumes partials as they arrive rather than buffering; trades memory for incremental progress.

Seen in

Last updated · 347 distilled / 1,201 read