SYSTEM Cited by 1 source
Flink DataStream API¶
Flink DataStream API is Apache Flink's imperative streaming API: users construct the operator graph explicitly (sources, transformations, process functions, sinks) and manage state, keying, and timers by hand. It is the escape hatch from Table API & SQL when the declarative planner's per-operator state model compounds badly or per-key temporal logic is needed.
Key primitives¶
DataStream[T]— a typed stream.keyBy(f)— partitions the stream by a key function; required before any keyed stateful operator.KeyedProcessFunction[K, I, O]— the general-purpose per-key operator:processElement(event, ctx, out)runs once per event, with access to keyedValueState/ListState/MapStateplus timers.ValueState[T]— oneTper key, backed by RocksDB or heap. One of several keyed-state primitives; the one Zalando used for per-SKU enriched records.DataStream.union(others...)— merges N streams of the same type into one. Combined withkeyBy, lets a singleKeyedProcessFunctionprocess heterogeneous events for one key.
Seen in¶
- sources/2026-03-03-zalando-why-we-ditched-flink-table-api-joins-cutting-state-by-75-with-datastream-unions
— Zalando rewrote Product Offer Enrichment as
DataStream.union(offer, boost, sponsored, product) → keyBy(SKU) → MultiStreamJoinProcessor extends KeyedProcessFunction[String, BaseEvent, EnrichedOffer], with a singleValueState[EnrichmentState]per SKU. The processor'sprocessElementpattern-matches on the event type and updates only the fields that belong to that type, returning early when the incoming event is redundant (same price+stock, older boost timestamp, unchanged product state). Result: state 235 GB → 56 GB (−76 %), canonical instance of patterns/stream-union-plus-keyed-process-function and patterns/single-valuestate-over-chained-joins at scale on AWS Managed Flink 1.20.
Skeleton (from the Zalando post, Scala)¶
case class EnrichmentState(
var price: Double = 0.0,
var stock: Double = 0.0,
var sortingScore: Double = 0.0,
var productState: String = null
)
class MultiStreamJoinProcessor
extends KeyedProcessFunction[String, BaseEvent, EnrichedOffer] {
private var state: ValueState[EnrichmentState] = _
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getState(
new ValueStateDescriptor("enriched-state", classOf[EnrichmentState]))
}
override def processElement(
event: BaseEvent, ctx: Context, out: Collector[EnrichedOffer]): Unit = {
val current = Option(state.value()).getOrElse(EnrichmentState())
event match {
case o: OfferEvent =>
if (o.price == current.price && o.stock == current.stock) return
current.price = o.price; current.stock = o.stock
case p: ProductEvent =>
if (p.productState == current.productState) return
current.productState = p.productState
case b: BoostEvent =>
if (b.timestamp <= current.boostTimestamp) return
current.sortingScore = b.sortingScore
}
state.update(current)
out.collect(EnrichedOffer(ctx.getCurrentKey, current))
}
}
Related¶
- systems/apache-flink — parent engine.
- systems/flink-table-api — declarative counterpart.
- systems/rocksdb — state backend for large keyed state.
- concepts/flink-keyed-stream-union — the operator-graph shape this API makes natural.
- concepts/declarative-vs-imperative-stream-api — the tradeoff axis between the two APIs.
- patterns/stream-union-plus-keyed-process-function · patterns/single-valuestate-over-chained-joins · patterns/event-time-filter-for-state-write-reduction.