Skip to content

CONCEPT Cited by 1 source

Spark ETL Feature DAG

Definition

Spark ETL Feature DAG is an architectural model for structuring Spark-based ETL pipelines as a directed acyclic graph of features — self-contained, web-API-shaped units of ETL work with declared inputs (sources dict), a transformation function, and an output schema. The DAG runtime topologically sorts features by their declared sources and executes them in dependency order.

Canonicalised by Yelp's 2025-02-19 Revenue Data Pipeline post as the model behind Yelp's internal spark-etl package. The post's verbatim framing: "The building blocks of a Spark-ETL program are Spark features, which define the input, transformation logic, and output. These features resemble web APIs with their request-response schemas."

Why it's useful

Raw Spark code tends to sprawl — a single run_pipeline.py with 500 lines of df = spark.read(...) calls, successive .filter() / .join() / .withColumn() transformations, and an ad-hoc mental model of which DataFrame depends on which.

The Feature DAG abstraction forces:

  • Composition over monolith — each feature is a class; its sources are a dict; composing pipelines means declaring features, not writing procedural wiring.
  • Reusability — a source-snapshot feature is loaded once and consumed by every downstream feature that declares it, without re-reading from S3 per consumer.
  • Testability — each feature's transform() is a pure function of its inputs; testable in isolation.
  • Observability — feature names become units of telemetry; the runtime can emit per-feature timing, row counts, errors.
  • Reproducibility — re-running a specific feature for a specific time window is a CLI flag, not a code edit.

The two feature sub-types (Yelp canon)

Source data snapshot features

Read a database-table snapshot from S3 and pass it through unchanged. Serve as the roots of the DAG — no sources beyond the underlying S3PublishedSource. Example shape from the post:

class TableXSnapshotFeature(SparkFeature):
    alias = "table_x_snapshot"

    def __init__(self):
        self.sources = {
            "table_x": S3PublishedSource(
                base_s3_path=get_s3_location_for_table("table_x"),
                source_schema_id=get_schema_ids_for_data_snapshot("table_x"),
                date_col="_dt",
                select_cols=TABLE_SCHEMA,
            )
        }

    def transform(self, spark, start_date, end_date, **kwargs):
        return kwargs["table_x"]

Transformation features

Consume other features as DataFrame inputs via the sources dict. Do real transformation work in transform(). Example shape:

class AggregatedFeature(SparkFeature):
    def __init__(self):
        self.sources = {
            "feature_x": ConfiguredSparkFeature(),
            "feature_y": ConfiguredSparkFeature(),
        }

    def transform(self, spark, start_date, end_date, **kwargs):
        fx = kwargs["feature_x"].withColumn("is_flag", lit(False))
        fy = kwargs["feature_y"].withColumn("time_changed", lit(None))
        return fx.unionByName(fy).drop("alignment").filter(...)

The sources dict is the edge declaration of the DAG: each key is a named dependency on another feature whose output becomes the corresponding kwarg in transform().

DAG execution mechanics

The runtime:

  1. Reads a YAML config listing only the nodes (feature aliases + class paths), not the edges.
  2. Inspects each feature's self.sources to discover its dependencies.
  3. Topologically sorts the feature set.
  4. Executes in order, passing each feature's output DataFrame as a kwarg to its downstream consumers' transform() calls.

Canonicalised as patterns/yaml-declared-feature-dag-topology-inferred. The payoff: YAML stays DRY — adding a new feature is a one-line change (one new entry in the features list). No graph editing, no edge-list maintenance.

Canonical DAG shape (Yelp post's example)

The post shows a multi-root DAG with many source-snapshot features at the leaves, converging into aggregation + transformation features, finally publishing one or more output features to S3 in REVREC-template shape.

[ source_feature_A ]──┐
[ source_feature_B ]──┼──► [ transformation_feature_1 ]──┐
[ source_feature_C ]──┘                                    │
                                                            ├──► [ publish_feature ] → S3
[ source_feature_D ]──┐                                    │
[ source_feature_E ]──┼──► [ transformation_feature_2 ]──┘
[ source_feature_F ]──┘

Composes with

Caveats and ceilings

  • Feature-count scaling — Yelp's Revenue Data Pipeline currently runs 50+ features in a 2-staged Spark job, and the team names this as a maintenance concern. Adding a new product requires changes + testing across the whole job. The feature-DAG abstraction doesn't eliminate the cost of maintaining many features; it just makes the cost easier to see.
  • Single-engine — Spark-only. Non-Spark steps (REST API calls, non-Spark ML inference) don't fit the abstraction.
  • Cross-feature optimisation — because each feature is a class, Catalyst can't reorder or fuse operators across feature boundaries. This is usually fine (feature boundaries are meaningful unit-of-work boundaries), but can leave performance on the table for trivial composition patterns.

Seen in

Last updated · 476 distilled / 1,218 read