Zalando — Why We Ditched Flink Table API Joins: Cutting State by 75% with DataStream Unions¶
Summary¶
Zalando's Search & Browse team publishes a production-engineering
retrospective on the Product Offer Enrichment pipeline — the
Apache Flink 1.20 job on
AWS Managed Flink that joins four
streams (partner pricing/stock offers, Boost sorting metadata,
sponsored-products metadata, product events) into a unified
enriched-offer view feeding the website catalog. The original
implementation used the declarative Table API & SQL, chaining
four stateful joins. Because each Flink join operator is an
independent unit that must keep its own RocksDB copy of both sides
for late/updated arrivals, state compounded multiplicatively
across the chain and grew to 235–245 GB per application. At
that size, the hourly
savepoint cronjob pinned the
cluster at 100 % CPU for ~12 minutes, starved processing with
backpressure, triggered a crash-restart loop, and frequently missed
snapshots outright; because AWS Managed Flink provisions all
resources proportionally as KPUs (1 vCPU + 4 GB RAM + 50 GB
storage, no storage-only scaling), the overprovisioning to absorb
these events showed up as very real money. The team rewrote the job
as a
stream-union +
KeyedProcessFunction: unify all four streams into one
DataStream[BaseEvent], key by SKU, and use a single
ValueState[EnrichmentState] that stores one copy of the
per-SKU enriched record and is updated field-by-field by the
incoming event type. Event-time / content filtering drops
redundant updates before touching state. Result: state 235 GB
→ 56 GB (−76 %), snapshot duration 11 min → 2.5 min (−77 %),
CPU stabilised at ~30 % from 100 %-spike pattern, restart 12–20 min
→ 4–5 min, AWS cost −13 %. The closing observation is that
Flink 2.1 introduced a MultiJoin operator (FLIP-516, 2025-05)
that expresses the same idea natively (one keyed operator for N
streams, no intermediate state) — but since AWS Managed Flink only
ships 1.20 as of Feb 2026, "we're covered by our home-baked
solution".
Key takeaways¶
- Flink's declarative Table-API joins chain state-amplifiers. Each join operator in Flink 1.20 "is a strictly independent unit. Because Flink must account for late-arrival data and potential updates, it must maintain data integrity by keeping every record in its internal state (RocksDB). When you chain four joins together, you aren't just adding state; you are multiplying it. Each join operator in the chain maintains its own copy of the keys and values it needs." The three-step "state math" is explicit: join(offer+boost) stores both; join(prev+sponsored) stores the full previous result again; join(prev+product) clones once more. Canonicalised as concepts/flink-stateful-join-state-amplification. (Source: sources/2026-03-03-zalando-why-we-ditched-flink-table-api-joins-cutting-state-by-75-with-datastream-unions.)
- At ~235 GB per application, snapshotting becomes the workload. "When your state reaches 235GB of data, your application stops being a data pipeline and starts being an unstable nightmare." Every hour, the savepoint cronjob iterated RocksDB, serialized, and moved to S3 — "this would keep the cluster's CPU at 100% for nearly 12 minutes." Because the job ran close to CPU limit, the savepoint starved record processing, lag grew, and Flink "would simply give up and restart", reloading state from S3 and "sometimes fall behind our 1-hour SLA". Lengthening snapshot intervals to several hours traded one SLO risk for another (restore time on failure). Many snapshots just didn't complete: "we were vulnerable because of unreliable data backups." Canonicalised in concepts/flink-snapshot-savepoint as the operational pathology when state outpaces snapshot throughput.
- On AWS Managed Flink, state size is a cost driver through KPUs. The disclaimer block is explicit: "Managed Service for Apache Flink provisions capacity as KPUs. A single KPU provides you with 1 vCPU and 4GB of memory. For every KPU allocated, 50GB of running application storage is also provided. This means that the application resources are always configured in terms of KPUs, there's no way to allocate more storage without also allocating more CPU and memory, or more memory without also allocating more CPU and storage." Every stop (including scale-in/out, which forces a restart) creates a snapshot, so 11–20 minute snapshot windows became 11–20 minute scaling windows. The team kept parallelism "10–20 % higher than normally required" as headroom — a direct tax on state bloat. Canonicalised as concepts/kpu-aws-managed-flink (KPU as the compute+memory+storage bundle) and the reason the cost reduction ended up at ~13 % rather than proportional to the state drop.
- Stream Union +
KeyedProcessFunctionreplaces N chained joins with one keyed operator. Instead of four joins chained, all incoming streams are unified into a singleDataStream[BaseEvent]and routed into a customMultiStreamJoinProcessor extends KeyedProcessFunction[String, BaseEvent, EnrichedOffer]. Key = SKU (the natural shared key across all four streams). Per-SKU state is a singleValueState[EnrichmentState]holding only the fields needed in the final enriched output. The pattern-match inprocessElementupdates the right field(s) in-place for each event type and emits the enriched record. Canonicalised as patterns/stream-union-plus-keyed-process-function and patterns/single-valuestate-over-chained-joins — the imperative shape of FLIP-516's MultiJoin, re-implemented at the DataStream API altitude. - Event-time / content filtering lives in the operator, not an
upstream stage. Three different filters appear directly in the
processor: (a) drop offer events where
priceandstockare unchanged from the stored state (content-dedup on semantic fields); (b) drop product events whereproductStateis unchanged; (c) drop boost events whose timestamp is older than what's already stored (if (b.timestamp <= current.boostTimestamp) return). All three return beforestate.update(current)so unchanged events never hit RocksDB. Canonicalised as patterns/event-time-filter-for-state-write-reduction — a manual substitute for watermark-driven retraction semantics that Table-API would have applied implicitly (and much more expensively). The team's framing: "In some cases, it would also be an option to compare the incoming events with already kept, using a relevant subset of fields to prevent updates from events that don't contain relevant changes." - Results are state-shaped, not CPU-shaped. The disclosed before/after table: | Metric | Table API (SQL) | DataStream API | Improvement | |---|---|---|---| | State Size | 235 GB | 56 GB | −76 % | | Snapshot Duration | 11 min | 2.5 min | −77 % | | CPU Usage | 100 % (spikes) | ~30 % (stable) | Stability | | AWS Costs | Baseline | −13 % | Savings | | Restart time | 12–20 min | 4–5 min | −62 % (approx) | The savings discrepancy (state −76 % but cost −13 %) is explicitly addressed: "the AWS costs are not so much about the state size, but more related to the CPU and memory resources. We did save some CPU capacity, but the memory usage didn't change much, because we still needed to keep the same amount of data in memory for processing. The CPU usage was more stable, but it wasn't reduced by 75 % because the processing logic still had to do the same amount of work on the same throughput. It just didn't have to deal with the overhead of managing multiple states." The cost savings came from eliminating the overscale margin, not from a proportional KPU reduction.
- Flink 2.1's
MultiJoinoperator is the native form of the same idea. "The Flink community was very much aware of the issue, and there was an improvement proposal dated May 19, 2025, calledMulti-Way Join Operator. This was then [introduced in Flink 2.1 as an experimental feature]." Flink docs disclaim "This is currently in an experimental state … We currently support only streaming INNER/LEFT joins. Support for RIGHT joins will be added soon." The published benchmark shows "2x to over 100x+ increase in processed records; 3x to over 1000x+ smaller state" — same structural idea as Zalando's hand-rolled operator (keyed state, one operator per join, no intermediate state). Canonicalised as concepts/multi-way-join-operator-flink with a blocked-on-managed-service caveat: "this feature alone will be worth the wait when we get there, but until then, we're covered by our home-baked solution." - Flink SQL is right for 90 % of cases; the 10 % is where the
abstraction leaks. The closing framing: "Flink SQL is
perfect for 90% of use cases — it's fast, elegant, and
maintainable. But a software engineer's value is in
recognizing the remaining 10%: the use cases where the
abstraction starts costing too much." The signal that
triggered the rewrite wasn't correctness or developer
complaint — it was operational pain (snapshot storms, crash
loops, oversized parallelism, AWS bills) on a genuinely large
per-SKU cardinality. The enabling condition for the DataStream
rewrite was an obvious shared key (SKU) across all
streams; without one, stream union +
KeyedProcessFunctionwould have needed synthetic keying and lost most of its simplicity. - The manual code turned out less verbose than the Table API
code it replaced. "Funnily enough, the 'more manual'
approach turned out to be even less verbose than the SQL
version, because our SQL was quite complex, with aggregations
for calculating the maximal timestamps between several parts
of the join and with ranking functions for making sure the
last record from the same part of the join always wins."
Concretely, the correctness logic (pick latest boost by
timestamp, dedupe offers on price+stock) was already being
expressed in SQL via
MAX()over windows + ranking functions; moving it into the imperative operator collapsed it into plainifguards. Counterintuitive but non-unique — a recurring pattern whenever the declarative engine is forced to express per-key temporal logic that the application naturally knows.
Architecture sketch¶
Before — Table API chained joins (state-amplifying):
offer ─┐
boost ─┼─► JOIN₁ ─┐
├─► JOIN₂ ─┐
sponsored ─────────┘ ├─► JOIN₃ ──► enriched
│
product ──────────────────────┘
Each JOIN keeps its own RocksDB copy of both sides
Σ state ≈ 235 GB (4-way chain)
After — DataStream API union + keyed processor (single state):
offer ─┐
boost ─┼─► UNION ──► keyBy(SKU) ──► MultiStreamJoinProcessor ──► enriched
sponsored │
product ───────────────────────────────┘
│
ValueState[EnrichmentState]
(one copy per SKU)
Σ state ≈ 56 GB.
Systems referenced¶
| System | Role |
|---|---|
| systems/apache-flink | JVM stream-processing engine; version 1.20 is the only one on AWS Managed Flink as of Feb 2026 |
| systems/flink-table-api | Declarative SQL-like API; the abstraction that produced the N-join state-amplification pathology |
| systems/flink-datastream-api | Imperative operator API; the rewrite target, KeyedProcessFunction + ValueState |
| systems/aws-managed-flink | AWS's managed Flink runtime provisioned in KPU units (1 vCPU + 4 GB + 50 GB each) |
| systems/rocksdb | Flink's default state backend; per-operator state grows on disk here |
| systems/flink-multijoin-operator | Flink 2.1's experimental native multi-way join (FLIP-516); same idea as Zalando's hand-rolled processor |
| systems/zalando-product-offer-enrichment | The pipeline in question — joins offer/boost/sponsored/product streams for the Zalando website catalog |
Concepts canonicalised¶
| Concept | Canonical framing |
|---|---|
| concepts/flink-stateful-join-state-amplification | Chaining N joins multiplies state; each operator stores its own RocksDB copy for late/updated-arrival correctness |
| concepts/flink-snapshot-savepoint | Hourly savepoint becomes the dominant workload past GB-per-SKU scale; 100 % CPU + restart loops + missed snapshots |
| concepts/flink-keyed-stream-union | Unify N heterogeneous streams into one DataStream[BaseEvent], key by shared business ID, process in one operator |
| concepts/kpu-aws-managed-flink | AWS Managed Flink bundles vCPU + RAM + local storage proportionally; you can't scale storage independently |
| concepts/multi-way-join-operator-flink | FLIP-516 (2025-05) → Flink 2.1 native operator for N-way streaming joins with single keyed state |
| concepts/declarative-vs-imperative-stream-api | Declarative API is right for 90 % of cases; the 10 % where per-key temporal logic leaks through makes imperative cheaper |
Patterns canonicalised¶
| Pattern | Canonical framing |
|---|---|
| patterns/stream-union-plus-keyed-process-function | Replace chained joins with union(...).keyBy(sku).process(MultiStreamJoinProcessor) — one state copy per key |
| patterns/single-valuestate-over-chained-joins | Store one POJO per key with all enrichment fields; event type dispatches to field update; no left/right sides |
| patterns/event-time-filter-for-state-write-reduction | Drop events where the semantic content or timestamp makes the update a no-op before touching state |
Operational numbers¶
| Dimension | Before | After | Δ |
|---|---|---|---|
| State size per application | 235–245 GB | 56 GB | −76 % |
| Snapshot duration | 11 min (up to 12) | 2.5 min | −77 % |
| CPU during snapshot | 100 % (spike) | ~30 % (stable) | stabilised |
| Restart time | 12–20 min | 4–5 min | ~−65 % |
| Overscale margin carried | 10–20 % | (removed) | −10 to −20 % of KPU |
| AWS cost | baseline | −13 % | −13 % |
| Number of join operators | 4 chained | 1 keyed processor | −3 operators |
| Flink version | 1.20 (AWS-constrained) | 1.20 (same) | — |
Caveats / what the post doesn't disclose¶
- No absolute KPU count before vs after, total TPS of the pipeline, or partner/SKU cardinality in production (only "peanuts" framing for traffic, and the 5M-SKU number is from a different Zalando system, not confirmed here).
- No savepoint-to-S3 bandwidth numbers or RocksDB compaction / write-amplification data.
- No per-stream event rates or per-join selectivity; the state amplification claim is explained structurally, not measured.
- No disclosure of whether the post-rewrite application still keeps the 10–20 % parallelism margin, only that it "was" kept before.
- No discussion of whether FLIP-516 / Flink 2.1
MultiJoinwould change the build-vs-wait calculus if AWS Managed Flink shipped 2.x (the post assumes not imminent). - The 13 % cost saving is disclosed as an aggregate; no breakdown between KPU-count drop vs per-KPU utilisation change.
- No mention of whether semantic-field deduplication (
price == current.price && stock == current.stock) has false-negative risk from partial updates or schema evolution.
Source¶
- Original: https://engineering.zalando.com/posts/2026/03/why-we-ditched-flink-table-api-joins-cutting-state.html
- Raw markdown:
raw/zalando/2026-03-03-why-we-ditched-flink-table-api-joins-cutting-state-by-75-wit-ef7ad583.md
Related¶
- companies/zalando — axis-20 anchor: Flink state discipline on AWS Managed Flink at the Search & Browse altitude.
- systems/apache-flink · systems/flink-table-api · systems/flink-datastream-api · systems/aws-managed-flink · systems/rocksdb · systems/flink-multijoin-operator · systems/zalando-product-offer-enrichment
- concepts/flink-stateful-join-state-amplification · concepts/flink-snapshot-savepoint · concepts/flink-keyed-stream-union · concepts/kpu-aws-managed-flink · concepts/multi-way-join-operator-flink · concepts/declarative-vs-imperative-stream-api
- patterns/stream-union-plus-keyed-process-function · patterns/single-valuestate-over-chained-joins · patterns/event-time-filter-for-state-write-reduction