PATTERN Cited by 1 source
Event-Time / Content Filter for State Write Reduction¶
Event-time / content filter for state write reduction is the
pattern of dropping incoming stream events inside a
KeyedProcessFunction when they would result in a no-op update —
before calling state.update(...) — so RocksDB never sees the
write.
Two filter forms show up consistently:
- Content filter. If the event's semantic fields equal the stored fields, return early.
- Event-time filter. If the event's timestamp is not newer than the stored timestamp, return early.
Both forms must appear before state.update(current) to have
any effect — the goal is to avoid touching the state backend, not
to post-filter the output.
Canonical code (from the Zalando post)¶
override def processElement(
event: BaseEvent, ctx: Context, out: Collector[EnrichedOffer]): Unit = {
val current = Option(state.value()).getOrElse(EnrichmentState())
event match {
case o: OfferEvent =>
// Content filter: skip if price + stock unchanged
if (o.price == current.price && o.stock == current.stock) return
current.price = o.price; current.stock = o.stock
case p: ProductEvent =>
// Content filter: skip if product state unchanged
if (p.productState == current.productState) return
current.productState = p.productState
case b: BoostEvent =>
// Event-time filter: skip if boost is not newer than stored
if (b.timestamp <= current.boostTimestamp) return
current.sortingScore = b.sortingScore
}
state.update(current) // only reaches here on change
out.collect(EnrichedOffer(ctx.getCurrentKey, current))
}
Why this matters on large state¶
On a RocksDB-backed state backend, each state.update(...)
writes to the memtable and eventually the SSTable. At the scale
where
stateful
join state amplification or per-SKU ValueState is already under
pressure, halving the write rate directly reduces RocksDB
compaction CPU, snapshot size growth, and backpressure on
ingestion.
Relationship to Table API retraction¶
A Table-API JOIN would apply equivalent semantics via
watermark-driven retraction / upsert handling, but the planner has
no semantic handle on "same content" or "stale timestamp" — it
must emit an update event and let downstream operators deduplicate
or rank. Moving the filter into the operator pays back the
verbosity loss of the imperative rewrite with a substantial write
reduction.
Design rules¶
- Filter by the semantics the application actually cares about,
not structural equality. Zalando checks
price == current.price && stock == current.stock, not a full record equality. - Compare against stored state, not against the previous event. Out-of-order ingestion means the filter must be idempotent under replay and robust to gaps.
- Return before
state.update. The point is to avoid the write. - Be explicit about output semantics. If downstream expects one enriched record per event arrival, skipping state updates still changes the emit pattern (Zalando emits on every processed event after a successful update; skipped events emit nothing).
Canonical instance¶
Zalando's Product Offer Enrichment processor uses all three filter
shapes above — canonical instance of content-dedup on
(price, stock), content-dedup on productState, and event-time
filter on boost timestamp.
Related¶
- systems/flink-datastream-api · systems/apache-flink · systems/rocksdb.
- concepts/flink-stateful-join-state-amplification — the adjacent pathology this pattern helps contain.
- patterns/stream-union-plus-keyed-process-function — the outer operator shape.
- patterns/single-valuestate-over-chained-joins — the state shape this filter protects.