Skip to content

PATTERN Cited by 1 source

Stream Union + KeyedProcessFunction

Stream Union + KeyedProcessFunction is the Flink DataStream-API pattern of replacing N chained stateful joins with a single keyed operator:

union(s₁, s₂, …, sₙ) → keyBy(shared_key) → KeyedProcessFunction
                                            one state per key

It is the imperative equivalent of Flink 2.1's native MultiJoin operator and the escape hatch when the planner-emitted chained-join state is unsupportable on the running Flink version (e.g., Flink 1.20 on AWS Managed Flink).

Structure

  1. All input streams share (or are typed into) a common supertype, e.g. DataStream[BaseEvent].
  2. union(...) merges them into one typed stream.
  3. keyBy(shared_key) partitions the stream by the natural shared identifier across all inputs.
  4. A custom KeyedProcessFunction[K, BaseEvent, Output] owns one ValueState[State] per key; processElement pattern-matches on the event subtype and updates the right fields in place.
  5. Optional: state.update(current) is skipped for semantically redundant events — event-time / content filtering applied before touching state.

When it fits

  • There is a natural shared key across all N streams (SKU, user ID, order ID).
  • Outputs need a merged enriched record per key, not a cross-product of unrelated keys.
  • Chained Table-API joins on the same data would hit state amplification and snapshot costs are already the operational bottleneck.
  • You are on a Flink version without the native MultiJoin operator, or on a managed runtime that pins an older Flink version.

When it doesn't fit

  • No shared key. If the join dimensions vary between input pairs, you cannot route all events into one keyed operator cleanly.
  • The 90 % case. If state is small (GB-scale), snapshot cost is invisible, and SQL expresses the logic naturally, staying on Table API is correct — keep the declarative form.
  • Heavy windowed aggregations across inputs. KeyedProcessFunction can implement windows via timers, but the planner-optimised table-API window operators are hard to beat when they apply.

Canonical instance

Zalando's Product Offer Enrichment pipeline (sources/2026-03-03-zalando-why-we-ditched-flink-table-api-joins-cutting-state-by-75-with-datastream-unions) used exactly this pattern to collapse 4 chained Table-API joins on SKU (offer + boost + sponsored + product events) into one MultiStreamJoinProcessor, cutting state from 235 GB to 56 GB (−76 %) and snapshot duration from 11 min to 2.5 min.

val enriched: DataStream[EnrichedOffer] =
  offers.union(boosts, sponsored, productEvents)
    .keyBy(_.sku)
    .process(new MultiStreamJoinProcessor())

See the full KeyedProcessFunction skeleton on the systems/flink-datastream-api page.

Last updated · 507 distilled / 1,218 read