CONCEPT Cited by 1 source
Spark ETL Feature DAG¶
Definition¶
Spark ETL Feature DAG is an architectural model for
structuring Spark-based ETL pipelines as a directed acyclic
graph of features — self-contained, web-API-shaped units
of ETL work with declared inputs (sources dict), a
transformation function, and an output schema. The DAG runtime
topologically sorts features by their declared sources and
executes them in dependency order.
Canonicalised by Yelp's 2025-02-19 Revenue Data Pipeline post
as the model behind Yelp's internal
spark-etl package. The post's
verbatim framing: "The building blocks of a Spark-ETL program
are Spark features, which define the input, transformation
logic, and output. These features resemble web APIs with their
request-response schemas."
Why it's useful¶
Raw Spark code tends to sprawl — a single run_pipeline.py
with 500 lines of df = spark.read(...) calls, successive
.filter() / .join() / .withColumn() transformations, and
an ad-hoc mental model of which DataFrame depends on which.
The Feature DAG abstraction forces:
- Composition over monolith — each feature is a class; its sources are a dict; composing pipelines means declaring features, not writing procedural wiring.
- Reusability — a source-snapshot feature is loaded once and consumed by every downstream feature that declares it, without re-reading from S3 per consumer.
- Testability — each feature's
transform()is a pure function of its inputs; testable in isolation. - Observability — feature names become units of telemetry; the runtime can emit per-feature timing, row counts, errors.
- Reproducibility — re-running a specific feature for a specific time window is a CLI flag, not a code edit.
The two feature sub-types (Yelp canon)¶
Source data snapshot features¶
Read a database-table snapshot from S3 and pass it through
unchanged. Serve as the roots of the DAG — no sources beyond
the underlying S3PublishedSource. Example shape from the
post:
class TableXSnapshotFeature(SparkFeature):
alias = "table_x_snapshot"
def __init__(self):
self.sources = {
"table_x": S3PublishedSource(
base_s3_path=get_s3_location_for_table("table_x"),
source_schema_id=get_schema_ids_for_data_snapshot("table_x"),
date_col="_dt",
select_cols=TABLE_SCHEMA,
)
}
def transform(self, spark, start_date, end_date, **kwargs):
return kwargs["table_x"]
Transformation features¶
Consume other features as DataFrame inputs via the sources
dict. Do real transformation work in transform(). Example
shape:
class AggregatedFeature(SparkFeature):
def __init__(self):
self.sources = {
"feature_x": ConfiguredSparkFeature(),
"feature_y": ConfiguredSparkFeature(),
}
def transform(self, spark, start_date, end_date, **kwargs):
fx = kwargs["feature_x"].withColumn("is_flag", lit(False))
fy = kwargs["feature_y"].withColumn("time_changed", lit(None))
return fx.unionByName(fy).drop("alignment").filter(...)
The sources dict is the edge declaration of the DAG: each
key is a named dependency on another feature whose output
becomes the corresponding kwarg in transform().
DAG execution mechanics¶
The runtime:
- Reads a YAML config listing only the nodes (feature aliases + class paths), not the edges.
- Inspects each feature's
self.sourcesto discover its dependencies. - Topologically sorts the feature set.
- Executes in order, passing each feature's output DataFrame as
a kwarg to its downstream consumers'
transform()calls.
Canonicalised as patterns/yaml-declared-feature-dag-topology-inferred. The payoff: YAML stays DRY — adding a new feature is a one-line change (one new entry in the features list). No graph editing, no edge-list maintenance.
Canonical DAG shape (Yelp post's example)¶
The post shows a multi-root DAG with many source-snapshot features at the leaves, converging into aggregation + transformation features, finally publishing one or more output features to S3 in REVREC-template shape.
[ source_feature_A ]──┐
[ source_feature_B ]──┼──► [ transformation_feature_1 ]──┐
[ source_feature_C ]──┘ │
├──► [ publish_feature ] → S3
[ source_feature_D ]──┐ │
[ source_feature_E ]──┼──► [ transformation_feature_2 ]──┘
[ source_feature_F ]──┘
Composes with¶
- patterns/source-plus-transformation-feature-decomposition — the canonical shape: thin source-snapshot features at DAG roots; rich transformation features downstream.
- concepts/checkpoint-intermediate-dataframe-debugging — materialise named features to scratch S3 for interactive inspection in Jupyterhub.
- concepts/pyspark-udf-for-complex-business-logic — the escape hatch for features whose business logic exceeds SQL's expressive power.
Caveats and ceilings¶
- Feature-count scaling — Yelp's Revenue Data Pipeline currently runs 50+ features in a 2-staged Spark job, and the team names this as a maintenance concern. Adding a new product requires changes + testing across the whole job. The feature-DAG abstraction doesn't eliminate the cost of maintaining many features; it just makes the cost easier to see.
- Single-engine — Spark-only. Non-Spark steps (REST API calls, non-Spark ML inference) don't fit the abstraction.
- Cross-feature optimisation — because each feature is a class, Catalyst can't reorder or fuse operators across feature boundaries. This is usually fine (feature boundaries are meaningful unit-of-work boundaries), but can leave performance on the table for trivial composition patterns.
Seen in¶
- sources/2025-02-19-yelp-revenue-automation-series-building-revenue-data-pipeline — canonical wiki instance. Feature abstraction, source + transformation sub-types, YAML topology-inferred DAG, checkpointing integration.