Skip to content

ZALANDO 2026-03-03

Read original ↗

Zalando — Why We Ditched Flink Table API Joins: Cutting State by 75% with DataStream Unions

Summary

Zalando's Search & Browse team publishes a production-engineering retrospective on the Product Offer Enrichment pipeline — the Apache Flink 1.20 job on AWS Managed Flink that joins four streams (partner pricing/stock offers, Boost sorting metadata, sponsored-products metadata, product events) into a unified enriched-offer view feeding the website catalog. The original implementation used the declarative Table API & SQL, chaining four stateful joins. Because each Flink join operator is an independent unit that must keep its own RocksDB copy of both sides for late/updated arrivals, state compounded multiplicatively across the chain and grew to 235–245 GB per application. At that size, the hourly savepoint cronjob pinned the cluster at 100 % CPU for ~12 minutes, starved processing with backpressure, triggered a crash-restart loop, and frequently missed snapshots outright; because AWS Managed Flink provisions all resources proportionally as KPUs (1 vCPU + 4 GB RAM + 50 GB storage, no storage-only scaling), the overprovisioning to absorb these events showed up as very real money. The team rewrote the job as a stream-union + KeyedProcessFunction: unify all four streams into one DataStream[BaseEvent], key by SKU, and use a single ValueState[EnrichmentState] that stores one copy of the per-SKU enriched record and is updated field-by-field by the incoming event type. Event-time / content filtering drops redundant updates before touching state. Result: state 235 GB → 56 GB (−76 %), snapshot duration 11 min → 2.5 min (−77 %), CPU stabilised at ~30 % from 100 %-spike pattern, restart 12–20 min → 4–5 min, AWS cost −13 %. The closing observation is that Flink 2.1 introduced a MultiJoin operator (FLIP-516, 2025-05) that expresses the same idea natively (one keyed operator for N streams, no intermediate state) — but since AWS Managed Flink only ships 1.20 as of Feb 2026, "we're covered by our home-baked solution".

Key takeaways

  • Flink's declarative Table-API joins chain state-amplifiers. Each join operator in Flink 1.20 "is a strictly independent unit. Because Flink must account for late-arrival data and potential updates, it must maintain data integrity by keeping every record in its internal state (RocksDB). When you chain four joins together, you aren't just adding state; you are multiplying it. Each join operator in the chain maintains its own copy of the keys and values it needs." The three-step "state math" is explicit: join(offer+boost) stores both; join(prev+sponsored) stores the full previous result again; join(prev+product) clones once more. Canonicalised as concepts/flink-stateful-join-state-amplification. (Source: sources/2026-03-03-zalando-why-we-ditched-flink-table-api-joins-cutting-state-by-75-with-datastream-unions.)
  • At ~235 GB per application, snapshotting becomes the workload. "When your state reaches 235GB of data, your application stops being a data pipeline and starts being an unstable nightmare." Every hour, the savepoint cronjob iterated RocksDB, serialized, and moved to S3 — "this would keep the cluster's CPU at 100% for nearly 12 minutes." Because the job ran close to CPU limit, the savepoint starved record processing, lag grew, and Flink "would simply give up and restart", reloading state from S3 and "sometimes fall behind our 1-hour SLA". Lengthening snapshot intervals to several hours traded one SLO risk for another (restore time on failure). Many snapshots just didn't complete: "we were vulnerable because of unreliable data backups." Canonicalised in concepts/flink-snapshot-savepoint as the operational pathology when state outpaces snapshot throughput.
  • On AWS Managed Flink, state size is a cost driver through KPUs. The disclaimer block is explicit: "Managed Service for Apache Flink provisions capacity as KPUs. A single KPU provides you with 1 vCPU and 4GB of memory. For every KPU allocated, 50GB of running application storage is also provided. This means that the application resources are always configured in terms of KPUs, there's no way to allocate more storage without also allocating more CPU and memory, or more memory without also allocating more CPU and storage." Every stop (including scale-in/out, which forces a restart) creates a snapshot, so 11–20 minute snapshot windows became 11–20 minute scaling windows. The team kept parallelism "10–20 % higher than normally required" as headroom — a direct tax on state bloat. Canonicalised as concepts/kpu-aws-managed-flink (KPU as the compute+memory+storage bundle) and the reason the cost reduction ended up at ~13 % rather than proportional to the state drop.
  • Stream Union + KeyedProcessFunction replaces N chained joins with one keyed operator. Instead of four joins chained, all incoming streams are unified into a single DataStream[BaseEvent] and routed into a custom MultiStreamJoinProcessor extends KeyedProcessFunction[String, BaseEvent, EnrichedOffer]. Key = SKU (the natural shared key across all four streams). Per-SKU state is a single ValueState[EnrichmentState] holding only the fields needed in the final enriched output. The pattern-match in processElement updates the right field(s) in-place for each event type and emits the enriched record. Canonicalised as patterns/stream-union-plus-keyed-process-function and patterns/single-valuestate-over-chained-joins — the imperative shape of FLIP-516's MultiJoin, re-implemented at the DataStream API altitude.
  • Event-time / content filtering lives in the operator, not an upstream stage. Three different filters appear directly in the processor: (a) drop offer events where price and stock are unchanged from the stored state (content-dedup on semantic fields); (b) drop product events where productState is unchanged; (c) drop boost events whose timestamp is older than what's already stored (if (b.timestamp <= current.boostTimestamp) return). All three return before state.update(current) so unchanged events never hit RocksDB. Canonicalised as patterns/event-time-filter-for-state-write-reduction — a manual substitute for watermark-driven retraction semantics that Table-API would have applied implicitly (and much more expensively). The team's framing: "In some cases, it would also be an option to compare the incoming events with already kept, using a relevant subset of fields to prevent updates from events that don't contain relevant changes."
  • Results are state-shaped, not CPU-shaped. The disclosed before/after table: | Metric | Table API (SQL) | DataStream API | Improvement | |---|---|---|---| | State Size | 235 GB | 56 GB | −76 % | | Snapshot Duration | 11 min | 2.5 min | −77 % | | CPU Usage | 100 % (spikes) | ~30 % (stable) | Stability | | AWS Costs | Baseline | −13 % | Savings | | Restart time | 12–20 min | 4–5 min | −62 % (approx) | The savings discrepancy (state −76 % but cost −13 %) is explicitly addressed: "the AWS costs are not so much about the state size, but more related to the CPU and memory resources. We did save some CPU capacity, but the memory usage didn't change much, because we still needed to keep the same amount of data in memory for processing. The CPU usage was more stable, but it wasn't reduced by 75 % because the processing logic still had to do the same amount of work on the same throughput. It just didn't have to deal with the overhead of managing multiple states." The cost savings came from eliminating the overscale margin, not from a proportional KPU reduction.
  • Flink 2.1's MultiJoin operator is the native form of the same idea. "The Flink community was very much aware of the issue, and there was an improvement proposal dated May 19, 2025, called Multi-Way Join Operator. This was then [introduced in Flink 2.1 as an experimental feature]." Flink docs disclaim "This is currently in an experimental state … We currently support only streaming INNER/LEFT joins. Support for RIGHT joins will be added soon." The published benchmark shows "2x to over 100x+ increase in processed records; 3x to over 1000x+ smaller state" — same structural idea as Zalando's hand-rolled operator (keyed state, one operator per join, no intermediate state). Canonicalised as concepts/multi-way-join-operator-flink with a blocked-on-managed-service caveat: "this feature alone will be worth the wait when we get there, but until then, we're covered by our home-baked solution."
  • Flink SQL is right for 90 % of cases; the 10 % is where the abstraction leaks. The closing framing: "Flink SQL is perfect for 90% of use cases — it's fast, elegant, and maintainable. But a software engineer's value is in recognizing the remaining 10%: the use cases where the abstraction starts costing too much." The signal that triggered the rewrite wasn't correctness or developer complaint — it was operational pain (snapshot storms, crash loops, oversized parallelism, AWS bills) on a genuinely large per-SKU cardinality. The enabling condition for the DataStream rewrite was an obvious shared key (SKU) across all streams; without one, stream union + KeyedProcessFunction would have needed synthetic keying and lost most of its simplicity.
  • The manual code turned out less verbose than the Table API code it replaced. "Funnily enough, the 'more manual' approach turned out to be even less verbose than the SQL version, because our SQL was quite complex, with aggregations for calculating the maximal timestamps between several parts of the join and with ranking functions for making sure the last record from the same part of the join always wins." Concretely, the correctness logic (pick latest boost by timestamp, dedupe offers on price+stock) was already being expressed in SQL via MAX() over windows + ranking functions; moving it into the imperative operator collapsed it into plain if guards. Counterintuitive but non-unique — a recurring pattern whenever the declarative engine is forced to express per-key temporal logic that the application naturally knows.

Architecture sketch

Before — Table API chained joins (state-amplifying):

offer  ─┐
boost  ─┼─► JOIN₁ ─┐
                   ├─► JOIN₂ ─┐
sponsored ─────────┘          ├─► JOIN₃ ──► enriched
product ──────────────────────┘

Each JOIN keeps its own RocksDB copy of both sides
Σ state ≈ 235 GB (4-way chain)

After — DataStream API union + keyed processor (single state):

offer  ─┐
boost  ─┼─► UNION ──► keyBy(SKU) ──► MultiStreamJoinProcessor ──► enriched
sponsored                               │
product ───────────────────────────────┘
                                   ValueState[EnrichmentState]
                                   (one copy per SKU)

Σ state ≈ 56 GB.

Systems referenced

System Role
systems/apache-flink JVM stream-processing engine; version 1.20 is the only one on AWS Managed Flink as of Feb 2026
systems/flink-table-api Declarative SQL-like API; the abstraction that produced the N-join state-amplification pathology
systems/flink-datastream-api Imperative operator API; the rewrite target, KeyedProcessFunction + ValueState
systems/aws-managed-flink AWS's managed Flink runtime provisioned in KPU units (1 vCPU + 4 GB + 50 GB each)
systems/rocksdb Flink's default state backend; per-operator state grows on disk here
systems/flink-multijoin-operator Flink 2.1's experimental native multi-way join (FLIP-516); same idea as Zalando's hand-rolled processor
systems/zalando-product-offer-enrichment The pipeline in question — joins offer/boost/sponsored/product streams for the Zalando website catalog

Concepts canonicalised

Concept Canonical framing
concepts/flink-stateful-join-state-amplification Chaining N joins multiplies state; each operator stores its own RocksDB copy for late/updated-arrival correctness
concepts/flink-snapshot-savepoint Hourly savepoint becomes the dominant workload past GB-per-SKU scale; 100 % CPU + restart loops + missed snapshots
concepts/flink-keyed-stream-union Unify N heterogeneous streams into one DataStream[BaseEvent], key by shared business ID, process in one operator
concepts/kpu-aws-managed-flink AWS Managed Flink bundles vCPU + RAM + local storage proportionally; you can't scale storage independently
concepts/multi-way-join-operator-flink FLIP-516 (2025-05) → Flink 2.1 native operator for N-way streaming joins with single keyed state
concepts/declarative-vs-imperative-stream-api Declarative API is right for 90 % of cases; the 10 % where per-key temporal logic leaks through makes imperative cheaper

Patterns canonicalised

Pattern Canonical framing
patterns/stream-union-plus-keyed-process-function Replace chained joins with union(...).keyBy(sku).process(MultiStreamJoinProcessor) — one state copy per key
patterns/single-valuestate-over-chained-joins Store one POJO per key with all enrichment fields; event type dispatches to field update; no left/right sides
patterns/event-time-filter-for-state-write-reduction Drop events where the semantic content or timestamp makes the update a no-op before touching state

Operational numbers

Dimension Before After Δ
State size per application 235–245 GB 56 GB −76 %
Snapshot duration 11 min (up to 12) 2.5 min −77 %
CPU during snapshot 100 % (spike) ~30 % (stable) stabilised
Restart time 12–20 min 4–5 min ~−65 %
Overscale margin carried 10–20 % (removed) −10 to −20 % of KPU
AWS cost baseline −13 % −13 %
Number of join operators 4 chained 1 keyed processor −3 operators
Flink version 1.20 (AWS-constrained) 1.20 (same)

Caveats / what the post doesn't disclose

  • No absolute KPU count before vs after, total TPS of the pipeline, or partner/SKU cardinality in production (only "peanuts" framing for traffic, and the 5M-SKU number is from a different Zalando system, not confirmed here).
  • No savepoint-to-S3 bandwidth numbers or RocksDB compaction / write-amplification data.
  • No per-stream event rates or per-join selectivity; the state amplification claim is explained structurally, not measured.
  • No disclosure of whether the post-rewrite application still keeps the 10–20 % parallelism margin, only that it "was" kept before.
  • No discussion of whether FLIP-516 / Flink 2.1 MultiJoin would change the build-vs-wait calculus if AWS Managed Flink shipped 2.x (the post assumes not imminent).
  • The 13 % cost saving is disclosed as an aggregate; no breakdown between KPU-count drop vs per-KPU utilisation change.
  • No mention of whether semantic-field deduplication (price == current.price && stock == current.stock) has false-negative risk from partial updates or schema evolution.

Source

Last updated · 507 distilled / 1,218 read