Skip to content

SYSTEM Cited by 1 source

Flink DataStream API

Flink DataStream API is Apache Flink's imperative streaming API: users construct the operator graph explicitly (sources, transformations, process functions, sinks) and manage state, keying, and timers by hand. It is the escape hatch from Table API & SQL when the declarative planner's per-operator state model compounds badly or per-key temporal logic is needed.

Key primitives

  • DataStream[T] — a typed stream.
  • keyBy(f) — partitions the stream by a key function; required before any keyed stateful operator.
  • KeyedProcessFunction[K, I, O] — the general-purpose per-key operator: processElement(event, ctx, out) runs once per event, with access to keyed ValueState / ListState / MapState plus timers.
  • ValueState[T] — one T per key, backed by RocksDB or heap. One of several keyed-state primitives; the one Zalando used for per-SKU enriched records.
  • DataStream.union(others...) — merges N streams of the same type into one. Combined with keyBy, lets a single KeyedProcessFunction process heterogeneous events for one key.

Seen in

Skeleton (from the Zalando post, Scala)

case class EnrichmentState(
  var price: Double = 0.0,
  var stock: Double = 0.0,
  var sortingScore: Double = 0.0,
  var productState: String = null
)

class MultiStreamJoinProcessor
    extends KeyedProcessFunction[String, BaseEvent, EnrichedOffer] {
  private var state: ValueState[EnrichmentState] = _

  override def open(parameters: Configuration): Unit = {
    state = getRuntimeContext.getState(
      new ValueStateDescriptor("enriched-state", classOf[EnrichmentState]))
  }

  override def processElement(
      event: BaseEvent, ctx: Context, out: Collector[EnrichedOffer]): Unit = {
    val current = Option(state.value()).getOrElse(EnrichmentState())
    event match {
      case o: OfferEvent =>
        if (o.price == current.price && o.stock == current.stock) return
        current.price = o.price; current.stock = o.stock
      case p: ProductEvent =>
        if (p.productState == current.productState) return
        current.productState = p.productState
      case b: BoostEvent =>
        if (b.timestamp <= current.boostTimestamp) return
        current.sortingScore = b.sortingScore
    }
    state.update(current)
    out.collect(EnrichedOffer(ctx.getCurrentKey, current))
  }
}
Last updated · 507 distilled / 1,218 read