Skip to content

CONCEPT Cited by 1 source

PySpark UDF for Complex Business Logic

Definition

A PySpark User-Defined Function (UDF) wraps a Python function so it can be applied row-wise (or grouped-row-wise) inside a Spark DataFrame operation. Two flavours:

  • pyspark.sql.functions.udf — generic Python UDF (row-at-a- time); what this wiki page focuses on.
  • pyspark.sql.functions.pandas_udf — vectorised (batch-at-a- time) UDF using Pandas internally; higher throughput but constrained on data types and semantics.

When UDFs are the right tool

PySpark's SQL-like expressions (select, filter, withColumn, groupBy, join, built-in functions like when / coalesce / window) cover the common ETL transformations with high throughput because Spark's Catalyst optimiser can reason about them. But they can't cleanly express all business logic.

Yelp's 2025-02-19 Revenue Data Pipeline post canonicalises the decision rule: UDFs are preferred "in such cases" where "PySpark's SQL-like expressions ... may not be flexible enough for complex business logic" — specifically logic with:

  • Multi-priority tie-break rules (A before B; within A, smallest ID wins; etc.).
  • Stateful iteration over grouped rows (e.g. apply discounts in priority order, decrementing remaining discount budget).
  • Conditional branching that doesn't flatten into CASE WHEN cleanly.

The canonical Yelp worked example

Five business rules for applying discounts to products grouped by customer:

  1. A product considers a discount applicable if its active period completely covers the discount's period.
  2. Each product can receive only one discount.
  3. Type A products receive discounts before Type B products.
  4. If products have the same type, the one with the smaller ID receives the discount first.
  5. Discounts with smaller IDs are applied first.

Implementation as a UDF (from the post, lightly edited for clarity):

@udf(ArrayType(DISCOUNT_APPLICATION_SCHEMA))
def calculate_discount_for_products(products, discounts):
    # Sort products and discounts based on priority
    products = sorted(products, key=lambda x: (x['type'], x['product_id']))
    discounts = sorted(discounts, key=lambda x: x['discount_id'])

    results = []
    for product in products:
        for discount in discounts:
            if period_covers(discount['period'], product['period']) \
                and discount['amount'] > 0:
                amount = min(product['budget'], discount['amount'])
                discount['amount'] -= amount
                results.append((
                    product['product_id'],
                    product['type'],
                    product['period'],
                    amount,
                    discount['discount_id'],
                ))
                break
    return results

# Apply the UDF to products + discounts grouped by business_id
result_df = (
    grouped_contracts_with_biz_discounts.withColumn(
        "results",
        calculate_discount_for_products("products", "discounts"),
    )
)

# Explode + project
result_exploded = result_df.select(
    "business_id", explode("results").alias("exploded_item")
).select(
    "business_id",
    "exploded_item.product_id",
    "exploded_item.amount",
    "exploded_item.discount_id",
)

Total: ~15 lines of readable Python for logic that a Catalyst author would need to implement as multiple nested window functions with priority partitioning + running aggregations. Yelp's verdict verbatim: "Without using UDFs, implementing this logic would require multiple window functions, which can be hard to read and maintain."

The performance + reliability cost

UDFs trade readability for performance. The post explicitly names the cost:

"We rely heavily on UDFs on calculating revenue period and contract amount. These UDFs are expensive to run and can penalize performance and reliability of the job over time."

Why UDFs are slower than built-ins:

  • Python serialisation overhead — each row (or batch for pandas_udf) is serialised from the JVM executor into a Python subprocess, processed, and serialised back.
  • Catalyst can't optimise through them — the UDF is a black box; the planner can't push filters or projections into it.
  • Error surfaces — a bug in the UDF fails at executor runtime rather than at planning time.

pandas_udf amortises the JVM↔Python boundary by operating on batches and is substantially faster than row-at-a-time UDFs when applicable, but still slower than native built-ins.

The readability / performance decision matrix

Logic shape Use
Simple projection / filter / case Built-in functions
Group-level aggregation groupBy + aggregation functions
Window with ordering + bounds Window + over clause
Multi-priority tie-break iteration UDF
Stateful loop over grouped rows UDF
Pandas-friendly vectorised transform pandas_udf
Call to external service / model pandas_udf (for throughput)

Yelp's stated future direction

Reduce UDF reliance by simplifying the source data models upstream so more logic naturally fits native PySpark expressions:

"Simplified Data Models: Simplifying source data models minimizes the need for custom UDFs, as mappings can be handled with standard PySpark functions."

This is the long-term fix for UDF overuse: rather than optimising UDFs or porting to pandas_udf, make the source shape such that UDFs aren't needed in the first place.

Seen in

Last updated · 476 distilled / 1,218 read