Skip to content

SYSTEM Cited by 1 source

Yelp spark-etl

Definition

spark-etl is Yelp's internal Python package that wraps Apache Spark (PySpark) into a feature-based DAG orchestration model for ETL jobs. Not open-source; disclosed in the 2025-02-19 Revenue Automation blog post.

Core abstraction: Spark Features

A Spark Feature is a unit of ETL work — "web APIs with request-response schemas" — with three parts:

  • An input schema (the sources dict — declares which other features this feature consumes).
  • A transform function (the transform() method — runs Spark operations on the input DataFrames).
  • An output schema (what the transform returns).

Two canonical sub-types, from the 2025-02-19 post:

Source data snapshot features

Read a database-table snapshot from S3 and pass it downstream unchanged. Example shape:

class ARandomDatabaseTableSnapshotFeature(SparkFeature):
    alias = f"{TABLE_NAME}_snapshot"

    def __init__(self) -> None:
        self.sources = {
            TABLE_NAME: S3PublishedSource(
                base_s3_path=get_s3_location_for_table(TABLE_NAME),
                source_schema_id=get_schema_ids_for_data_snapshot(TABLE_NAME),
                date_col="_dt",
                select_cols=TABLE_SCHEMA,
            )
        }

    def transform(self, spark, start_date, end_date, **kwargs) -> DataFrame:
        return kwargs[TABLE_NAME]

The transform is trivial — it just returns the read DataFrame. The value is that the snapshot is loaded once and reused across all downstream features that declare it as a source.

Transformation features

Take other features as pyspark.DataFrame inputs via the sources dict; run real transformation logic in transform():

class ARandomTransformationFeature(SparkFeature):
    def __init__(self) -> None:
        self.sources = {
            "feature_x": ConfiguredSparkFeature(),
            "feature_y": ConfiguredSparkFeature(),
        }

    def transform(self, spark, start_date, end_date, **kwargs) -> DataFrame:
        feature_x = kwargs["feature_x"]
        feature_y = kwargs["feature_y"]
        feature_x = feature_x.withColumn("is_flag", lit(False).cast(BooleanType()))
        feature_y = feature_y.withColumn("time_changed", lit(None).cast(IntegerType()))
        aggregated_feature = feature_x.unionByName(feature_y).drop("alignment")
        return aggregated_feature.filter(...)

PySpark's standard select / filter / join / withColumn are the default building blocks; UDFs (concepts/pyspark-udf-for-complex-business-logic) are used when business logic exceeds SQL's expressive power cleanly.

YAML dependency declaration

Dependencies between features are not enumerated as edges in the YAML config. The YAML lists only the nodes:

features:
    <feature1_alias>:
        class: <path.to.my.Feature1Class>
    <feature2_alias>:
        class: <path.to.my.Feature2Class>
    <feature3_alias>:
        class: <path.to.my.Feature3Class>
    <feature4_alias>:
        class: <path.to.my.Feature4Class>

publish:
    s3:
        - <feature4_alias>:
            path: s3a://bucket/path/to/desired/location
            overwrite: True

At runtime, spark-etl inspects each feature's SparkFeature.sources field and topologically sorts to produce the execution order. Post's verbatim: "There is no need to draw a complex diagram of dependency relationships in the yaml file. At runtime, spark-etl figures out the execution sequence according to topology." The edge-free YAML stays DRY and adding a new feature is a one-line change.

Canonicalised as patterns/yaml-declared-feature-dag-topology-inferred.

Checkpointing for debuggability

Spark's distributed + lazy evaluation makes interactive breakpoint debugging impractical — especially on DataFrames with millions of rows. spark-etl handles this by checkpointing intermediate feature outputs to an S3 scratch path:

spark-submit \
    /path/to/spark_etl_runner.py \
    --team-name my_team \
    --notify-email my_email@example.com \
    --feature-config /path/to/feature_config.yaml \
    --publish-path s3a://my-bucket/publish/ \
    --scratch-path s3a://my-bucket/scratch/ \
    --start-date 2024-02-29 \
    --end-date 2024-02-29 \
    --checkpoint feature1, feature2, feature3

Named features are materialised to the scratch path; follow-up analysis happens in systems/jupyterhub against the written Parquet — making debugging "more straightforward and shareable among the team." Also enables faster pipeline resumption when computationally expensive features don't need re-execution.

Canonicalised as concepts/checkpoint-intermediate-dataframe-debugging.

CLI shape

Known flags from the 2025-02-19 post's command example:

  • --team-name — attribution / routing for alerts
  • --notify-email — alert destination on job failure
  • --feature-config — path to YAML DAG declaration
  • --publish-path — S3 prefix for final published outputs
  • --scratch-path — S3 prefix for intermediate checkpoints
  • --start-date / --end-date — time-window bounds passed to each feature's transform(spark, start_date, end_date, **kwargs)
  • --checkpoint — comma-separated list of features to materialise to scratch

Comparison to alternatives

  • Apache Airflow / Dagster / Prefect — general-purpose workflow orchestrators. spark-etl is narrower: Spark-native DataFrame orchestration only. The cost/benefit is simpler mental model for a single-engine pipeline vs the richer abstractions of multi-engine orchestrators.
  • dbt — SQL-first transformation orchestrator over warehouses. spark-etl occupies the dual position: PySpark- first transformation orchestrator over a data lake. Yelp's 2025-02-19 post explicitly rejects dbt for revenue-recognition complexity ("difficult to represent complex logic in SQL").
  • Raw PySpark + hand-rolled DAG — what spark-etl replaces at Yelp. The feature abstraction makes the DAG shape encoded in Python class definitions (via sources) rather than imperative pipeline wiring.

Caveats

  • Not open-source as of the 2025-02-19 disclosure. The post documents the API shape and conceptual model without publishing code. Consumers can rebuild the pattern from the described interfaces but cannot adopt spark-etl directly.
  • Known scale ceiling: Yelp's Revenue Data Pipeline runs 50+ features in a 2-staged job, and the team identifies this as a maintenance risk. The architecture evolves — future- direction is feature-team-owned standardised data interfaces to shrink the feature count any single team maintains.
  • Single-engine — Spark only. Non-Spark processing stages (e.g. REST API calls, ML inference via non-Spark runtimes) are outside the framework.

Seen in

Last updated · 476 distilled / 1,218 read