Skip to content

PATTERN Cited by 1 source

Event-Time / Content Filter for State Write Reduction

Event-time / content filter for state write reduction is the pattern of dropping incoming stream events inside a KeyedProcessFunction when they would result in a no-op update — before calling state.update(...) — so RocksDB never sees the write.

Two filter forms show up consistently:

  1. Content filter. If the event's semantic fields equal the stored fields, return early.
  2. Event-time filter. If the event's timestamp is not newer than the stored timestamp, return early.

Both forms must appear before state.update(current) to have any effect — the goal is to avoid touching the state backend, not to post-filter the output.

Canonical code (from the Zalando post)

From sources/2026-03-03-zalando-why-we-ditched-flink-table-api-joins-cutting-state-by-75-with-datastream-unions:

override def processElement(
    event: BaseEvent, ctx: Context, out: Collector[EnrichedOffer]): Unit = {
  val current = Option(state.value()).getOrElse(EnrichmentState())

  event match {
    case o: OfferEvent =>
      // Content filter: skip if price + stock unchanged
      if (o.price == current.price && o.stock == current.stock) return
      current.price = o.price; current.stock = o.stock

    case p: ProductEvent =>
      // Content filter: skip if product state unchanged
      if (p.productState == current.productState) return
      current.productState = p.productState

    case b: BoostEvent =>
      // Event-time filter: skip if boost is not newer than stored
      if (b.timestamp <= current.boostTimestamp) return
      current.sortingScore = b.sortingScore
  }

  state.update(current)                    // only reaches here on change
  out.collect(EnrichedOffer(ctx.getCurrentKey, current))
}

Why this matters on large state

On a RocksDB-backed state backend, each state.update(...) writes to the memtable and eventually the SSTable. At the scale where stateful join state amplification or per-SKU ValueState is already under pressure, halving the write rate directly reduces RocksDB compaction CPU, snapshot size growth, and backpressure on ingestion.

Relationship to Table API retraction

A Table-API JOIN would apply equivalent semantics via watermark-driven retraction / upsert handling, but the planner has no semantic handle on "same content" or "stale timestamp" — it must emit an update event and let downstream operators deduplicate or rank. Moving the filter into the operator pays back the verbosity loss of the imperative rewrite with a substantial write reduction.

Design rules

  1. Filter by the semantics the application actually cares about, not structural equality. Zalando checks price == current.price && stock == current.stock, not a full record equality.
  2. Compare against stored state, not against the previous event. Out-of-order ingestion means the filter must be idempotent under replay and robust to gaps.
  3. Return before state.update. The point is to avoid the write.
  4. Be explicit about output semantics. If downstream expects one enriched record per event arrival, skipping state updates still changes the emit pattern (Zalando emits on every processed event after a successful update; skipped events emit nothing).

Canonical instance

Zalando's Product Offer Enrichment processor uses all three filter shapes above — canonical instance of content-dedup on (price, stock), content-dedup on productState, and event-time filter on boost timestamp.

Last updated · 507 distilled / 1,218 read