CONCEPT Cited by 1 source
PySpark UDF for Complex Business Logic¶
Definition¶
A PySpark User-Defined Function (UDF) wraps a Python function so it can be applied row-wise (or grouped-row-wise) inside a Spark DataFrame operation. Two flavours:
pyspark.sql.functions.udf— generic Python UDF (row-at-a- time); what this wiki page focuses on.pyspark.sql.functions.pandas_udf— vectorised (batch-at-a- time) UDF using Pandas internally; higher throughput but constrained on data types and semantics.
When UDFs are the right tool¶
PySpark's SQL-like expressions (select, filter,
withColumn, groupBy, join, built-in functions like when
/ coalesce / window) cover the common ETL transformations
with high throughput because Spark's Catalyst optimiser can
reason about them. But they can't cleanly express all business
logic.
Yelp's 2025-02-19 Revenue Data Pipeline post canonicalises the decision rule: UDFs are preferred "in such cases" where "PySpark's SQL-like expressions ... may not be flexible enough for complex business logic" — specifically logic with:
- Multi-priority tie-break rules (A before B; within A, smallest ID wins; etc.).
- Stateful iteration over grouped rows (e.g. apply discounts in priority order, decrementing remaining discount budget).
- Conditional branching that doesn't flatten into
CASE WHENcleanly.
The canonical Yelp worked example¶
Five business rules for applying discounts to products grouped by customer:
- A product considers a discount applicable if its active period completely covers the discount's period.
- Each product can receive only one discount.
- Type A products receive discounts before Type B products.
- If products have the same type, the one with the smaller ID receives the discount first.
- Discounts with smaller IDs are applied first.
Implementation as a UDF (from the post, lightly edited for clarity):
@udf(ArrayType(DISCOUNT_APPLICATION_SCHEMA))
def calculate_discount_for_products(products, discounts):
# Sort products and discounts based on priority
products = sorted(products, key=lambda x: (x['type'], x['product_id']))
discounts = sorted(discounts, key=lambda x: x['discount_id'])
results = []
for product in products:
for discount in discounts:
if period_covers(discount['period'], product['period']) \
and discount['amount'] > 0:
amount = min(product['budget'], discount['amount'])
discount['amount'] -= amount
results.append((
product['product_id'],
product['type'],
product['period'],
amount,
discount['discount_id'],
))
break
return results
# Apply the UDF to products + discounts grouped by business_id
result_df = (
grouped_contracts_with_biz_discounts.withColumn(
"results",
calculate_discount_for_products("products", "discounts"),
)
)
# Explode + project
result_exploded = result_df.select(
"business_id", explode("results").alias("exploded_item")
).select(
"business_id",
"exploded_item.product_id",
"exploded_item.amount",
"exploded_item.discount_id",
)
Total: ~15 lines of readable Python for logic that a Catalyst author would need to implement as multiple nested window functions with priority partitioning + running aggregations. Yelp's verdict verbatim: "Without using UDFs, implementing this logic would require multiple window functions, which can be hard to read and maintain."
The performance + reliability cost¶
UDFs trade readability for performance. The post explicitly names the cost:
"We rely heavily on UDFs on calculating revenue period and contract amount. These UDFs are expensive to run and can penalize performance and reliability of the job over time."
Why UDFs are slower than built-ins:
- Python serialisation overhead — each row (or batch for
pandas_udf) is serialised from the JVM executor into a Python subprocess, processed, and serialised back. - Catalyst can't optimise through them — the UDF is a black box; the planner can't push filters or projections into it.
- Error surfaces — a bug in the UDF fails at executor runtime rather than at planning time.
pandas_udf amortises the JVM↔Python boundary by operating
on batches and is substantially faster than row-at-a-time UDFs
when applicable, but still slower than native built-ins.
The readability / performance decision matrix¶
| Logic shape | Use |
|---|---|
| Simple projection / filter / case | Built-in functions |
| Group-level aggregation | groupBy + aggregation functions |
| Window with ordering + bounds | Window + over clause |
| Multi-priority tie-break iteration | UDF |
| Stateful loop over grouped rows | UDF |
| Pandas-friendly vectorised transform | pandas_udf |
| Call to external service / model | pandas_udf (for throughput) |
Yelp's stated future direction¶
Reduce UDF reliance by simplifying the source data models upstream so more logic naturally fits native PySpark expressions:
"Simplified Data Models: Simplifying source data models minimizes the need for custom UDFs, as mappings can be handled with standard PySpark functions."
This is the long-term fix for UDF overuse: rather than
optimising UDFs or porting to pandas_udf, make the source
shape such that UDFs aren't needed in the first place.
Seen in¶
- sources/2025-02-19-yelp-revenue-automation-series-building-revenue-data-pipeline — canonical wiki instance. Five-rule discount-application UDF worked example with code sketch, readability argument against window-function-only implementation, named performance + reliability cost, future direction of reducing UDF count via simpler source models.
Related¶
- systems/apache-spark — underlying engine
- systems/yelp-spark-etl — canonical production user
- concepts/spark-etl-feature-dag — the broader Spark-ETL abstraction UDFs live inside
- companies/yelp