Skip to content

Amazon's Exabyte-Scale Migration from Apache Spark to Ray on Amazon EC2

One-paragraph summary

Amazon Retail's Business Data Technologies (BDT) team is in the middle of migrating the largest production business-intelligence datasets in Amazon off systems/apache-spark onto systems/ray. The workload is compaction — the copy-on-write merge of change-data-capture (CDC) log streams over exabyte-scale tables in S3 — and the migration is happening on the live pipeline behind Amazon's post-Oracle BI stack. Ray replaces Spark because Ray's programming model (tasks + actors + distributed object store + locality-aware scheduler + autoscaling) let BDT hand-craft a specialist compactor with zero-copy intranode shuffles and reference-based copies of untouched S3 files. Q1 2024 results: 1.5 EiB of input Parquet compacted, 82% better cost efficiency than Spark per GiB, 100% on-time delivery of new compacted data over two quarters, translating to the typical customer value of ~$120M/yr saved on EC2 on-demand R5. Not yet bettering Spark on reliability (first-time success 99.15% vs 99.91%) or cluster memory utilisation (54.6% avg). Flash Compactor contributed back into Ray's open-source systems/deltacat as a step toward the same benefits on systems/apache-iceberg, systems/apache-hudi, and systems/delta-lake.

Background chain (why this was the problem)

  • 2016 — Amazon was migrating off "what some surmised to be the largest Oracle Data Warehouse on Earth" (>50 PB). Werner tweet cited in the post.
  • Architectural answer — the now-canonical concepts/compute-storage-separation: S3 for storage, mixed compute (systems/amazon-redshift, systems/aws-rds, systems/apache-hive on systems/amazon-emr). 50 PB of Oracle table data copied to S3 as delimited text, with each table wrapped in a generic schema based on the systems/amazon-ion type system.
  • 2018 — last Oracle Data Warehouse cluster shut down; migration declared complete.
  • BDT then built a table subscription service: anyone at Amazon subscribes to an S3-backed table, picks their analytics framework (systems/amazon-athena / Apache Flink / Redshift / Hive / AWS Glue / Spark / …), and either queries on demand or auto-triggers on new data arrival.
  • Tables as unbounded streams of S3 files. Each file contains records to insert, update, or delete — a classic CDC log (concepts/change-data-capture). The subscriber's compute framework had to dynamically merge all changes at read time to get current table state.

The problem that motivated the compactor (pre-2019)

  • CDC logs grew too large to merge in their entirety at read time on the largest clusters.
  • Tables developed pathological shapes: millions of KB-scale tiny files or a few TB-scale huge files.
  • New subscriptions to the largest tables would take days or weeks to complete a merge, or just fail.
  • Fix: run the merge once and write a read-optimised version of the table for subscribers. This is the copy-on-write merge pattern (concepts/copy-on-write-merge) now universal in systems/apache-iceberg and systems/apache-hudi.
  • BDT's Spark job that did this was called "the compactor": it compacts a CDC log stream of N deltas across M files down to a single insert delta of K equivalently-sized files.

Two compaction modes called out:

  • Append-only compaction — deltas only insert; merge is concatenation; compactor still responsible for sizing output files to optimise reads (big-file split, tiny-file merge).
  • Upsert compaction — deltas contain inserts and updates (with a specified merge key); only latest value per key survives.

Why Spark stopped scaling (2019)

  • Data catalog grew petabyte → exabyte by 2019.
  • Compacting all in-scope tables was becoming prohibitively expensive.
  • Manual job tuning required to compact their largest tables.
  • Jobs exceeded expected completion times.
  • Limited options to address perf issues because Spark "successfully (and unfortunately in this case) abstracted away most of the low-level data processing details" — the cost of a general-purpose engine for a specialist workload.
  • Considered and rejected an in-house distributed compute service they were already using for table management ops (schema/partition evolution, data repair): right primitives (tasks + actors) but maintainability concerns (complex programming model), high task-invocation overhead, no cluster autoscaling.

Why Ray (the PoC)

Came across the Ray paper and met the Berkeley RISELab team.

Ray features named as load-bearing:

Initial 2020 PoC on production datasets found Ray could:

  • Compact 12× larger datasets than Spark.
  • Improve cost efficiency by 91%.
  • Process 13× more data per hour.

Contributing factors: reduced task orchestration overhead + reduced GC overhead + zero-copy shuffles + fine-grained autoscaling. BDT's biggest lever was flexibility to hand-craft a specialist — e.g. copying untouched S3 files by reference during the distributed merge instead of rewriting them (patterns/reference-based-copy-optimization); linked to DeltaCAT source: deltacat/compute/compactor_v2/steps/merge.py#L199-L257.

"BDT didn't need a generalist for the compaction problem, but a specialist, and Ray let them narrow their focus down to optimising the specific problem at hand."

Build-out and migration chain (2021 → 2024)

  • 2021 — serverless-job-management substrate: Ray on EC2 + systems/dynamodb + systems/aws-sns + systems/aws-sqs + systems/aws-s3 for durable job lifecycle tracking. Presented at Ray Summit 2021. Initial Ray compactor contributed to systems/deltacat (then called Ray DeltaCAT) as a step toward enabling the same on Iceberg / Hudi / Delta Lake. Today systems/anyscale-platform and systems/aws-glue-for-ray mean users don't need to build this from scratch.
  • 2022 — first exabyte test-bed: a data-quality service that computed dataset statistics, trends, and anomalies across the whole catalog. Used to shake out Ray issues before placing it on the business-critical path. Main surprises: EC2 resource utilisation + slow cluster start and OOMs.
  • OOMs → resolved by memory-aware scheduling (concepts/memory-aware-scheduling), telegraphing expected memory use at task-invocation time from past trends.
  • EC2 management → solved by patterns/heterogeneous-cluster-provisioning: discover a set of instance types that can meet expected resource requirements; then provision the most readily available mix across AZs. Trades knowing exactly which instance type a Ray cluster will get for faster provisioning. Ray applications must therefore drop any assumptions about underlying CPU architecture, disk type, or hardware.
  • Late 2022 — confidence to start the migration proper. Priority order: the largest ~1% of tables, which accounted for ~40% of total Spark compaction cost and the majority of job failures. Manual shadow compaction on a subset; Ray-based DQ service compared outputs for equivalence.
  • Data Quality (DQ) analysis focused on:
  • Dataset-level: record counts, cardinalities, min/max/avg.
  • File-level: Parquet file format version + feature parity (e.g. if Spark didn't emit a Bloom filter, Ray shouldn't either).
  • Explicit choice not to compare byte-for-byte. Why: queries run on different compute frameworks against TB/PB datasets "almost never produce equal results at this granularity" — decimal rounding, non-deterministic execution plans (unstable sorts), Parquet metadata drift, value overflow handling, -0 vs 0 equality, timestamp ↔ timezone representation, pre-Gregorian calendar date interpretation, leap seconds.
  • Additionally: a Data Reconciliation Service that issued real-end-user queries through Redshift / Spark / Athena against both Spark and Ray outputs and compared results (patterns/shadow-migration).
  • 2023fully automated 1:1 shadow compaction. Every compaction job for a migrating table ran in both frameworks. Purpose: direct comparisons, varied-table coverage, corner-case flushing at scale. Cost explicitly doubled for the duration — "if everything crashed and burned at this point, it would become a capital loss for the business and filed away as a hard lesson learned."
  • Subscriber switchover — a service that dynamically moves individual table subscribers from Spark-compacted output to Ray-compacted output one at a time, with per-subscriber rollback (patterns/subscriber-switchover). Alternative considered and rejected: overwrite Spark's output in place and hope.

Results (first quarter 2024)

  • Volume: 1.5 EiB input Apache Parquet from S3 → 4 EiB corresponding in-memory Apache Arrow. >10,000 years of EC2 vCPU compute time consumed. Cluster scale per job up to 26,846 vCPUs and 210 TiB RAM.
  • Throughput: >20 PiB/day input S3 across >1,600 Ray jobs/day. Average Ray compaction job reads >10 TiB input S3, merges, and writes result back in <7 min including cluster setup/teardown.
  • Timeliness: 100% on-time delivery of new compacted data to subscribers over Q4 2023 + Q1 2024. >90% of new table updates applied and queryable within 60 min of arrival.
  • Cost efficiency: 82% better than Spark per GiB of S3 input compacted (>1.2 EiB Apache Parquet benchmark, equivalent memory-optimised EC2 instance types). Translates to >220,000 years of EC2 vCPU compute time saved annually; in typical-EC2 -customer terms, >$120M/yr in on-demand R5 charges.

Caveats / still-open gaps

  • Reliability still trails Spark. 2023: Ray's first-time compaction job success rate trailed Spark by up to 15%; 2024 avg 99.15% vs Spark's 99.91% (0.76 pp gap). Ray needs more retries + more manual overhead. Data: >215k job runs over 4-week trailing windows on equivalent memory-optimised EC2 types.
  • Cluster memory utilisation. Q1 2024: 19.4 TiB used of 36 TiB allocated (54.6% avg). Memory is the bottleneck, so other resources (CPU, network, disk) are even less utilised. If BDT can get memory to 90%, cost-efficiency improvement vs Spark should rise from 82% → 90%+.
  • Ray is still only compacting a minority of the catalog; BDT wants a longer reliability track record before onboarding more.
  • Byte-for-byte equivalence is explicitly not promised (see DQ notes above).

What's next (per the post)

  • New major revision of "The Flash Compactor" taking fuller advantage of Ray pipelining + autoscaling.
  • Target end-2024: >90% cost-efficiency improvement vs Spark with no manual tuning / human intervention.
  • Joint optimisation with the systems/daft project: improved S3 Parquet + delimited-text I/O on Ray via Daft gave +24% production compaction cost efficiency on top of Ray alone. Daft read benchmarks: median single-column read 55% faster than PyArrow and 91% faster than S3Fs; median full-file read 19% faster than PyArrow, 77% faster than S3Fs.
  • Cautionary framing: no blanket recommendation to migrate all Spark to Ray. Spark's feature-rich data-processing abstractions still work well for day-to-day use cases; no automated Spark → Ray transpiler exists. But Exoshuffle holds the 2022 Cloud Terasort cost record — evidence Ray can be a world-class data-processing framework and world-class distributed-ML framework simultaneously, if you have a painful specialist workload worth porting.

Open-source contribution

Key takeaways

  1. Specialist > generalist when the workload pays for it. Spark won on generality; Ray won on specialist optimisation surface. The escape hatch to hand-craft a distributed merge with reference-based file copies and zero-copy intranode shuffles is only worth reaching for when the workload cost justifies bypassing the higher-level abstraction. BDT's ~$120M/yr customer-equivalent saving is what justifies this.
  2. Compute–storage separation is the enabling platform move. S3 is the durable substrate; Spark, Ray, Redshift, Athena, and Hive all front the same storage. Swapping compute engines is architecturally cheap because the source of truth doesn't move (concepts/compute-storage-separation).
  3. Copy-on-write compaction is the scale-out answer to read-time-merge CDC streams. Once CDC logs grow beyond the memory of the largest reader cluster, the only scalable answer is to materialise a pre-merged read-optimised version off the hot path (concepts/copy-on-write-merge). This pattern is the structural reason systems/apache-iceberg, systems/apache-hudi, and systems/delta-lake exist.
  4. Shadow the migration, don't flag-cut it. The canonical shape is run both engines in 1:1 shadow, compare outputs on dataset-level statistics (not byte-for-byte), reconcile real queries across multiple frameworks, then move individual subscribers off the old engine one at a time with the freedom to reverse course (patterns/shadow-migration + patterns/subscriber-switchover).
  5. Heterogeneous fleets with per-instance-type agnosticism are the operational answer to EC2 capacity variance. Don't pick one instance type up front; pick a set that meets your resource shape and let the provisioner grab whatever is most available. Application-side cost: drop CPU-arch / disk-type / hardware assumptions (patterns/heterogeneous-cluster-provisioning).
  6. Dataset-statistics equivalence > byte-for-byte equivalence at PB scale. Byte-for-byte comparison of frameworks at this scale is impossible (decimal rounding, unstable sorts, Parquet metadata drift, timestamp/timezone drift, etc). The practical bar is: record counts, cardinalities, min/max/avg match; file-format features match; real-query results across multiple consumer frameworks match.
  7. Memory is the bottleneck in columnar compaction. BDT's 54.6% average memory utilisation caps their cost-efficiency win below its ceiling. Ray's memory-aware scheduling gives the mechanism to do better; closing the 54 → 90% gap is worth +8 pp of the Ray vs Spark cost-efficiency delta.
  8. First-time success rate is a real reliability axis that takes years to close. Ray's 2024 99.15% vs Spark's 99.91% is a 0.76 pp gap, but it directly maps to retry cost + manual on-call overhead. Reliability is what you buy with operational time, not with a framework swap.
  9. Generalist Spark-to-Ray replacement is not yet on offer. No paved automated Spark → Ray translation path; migration requires hand-authoring. Converting an expensive, painful specialist job is viable; converting the whole Spark estate is not.

Operational numbers

  • Amazon's post-Oracle data footprint: 50 PB → exabyte (petabyte to exabyte between 2016 and 2019).
  • Q1 2024 Ray compaction: 1.5 EiB input Parquet → 4 EiB in-memory Arrow.
  • >10,000 years of EC2 vCPU compute in Q1 2024 alone.
  • Per-cluster scale: up to 26,846 vCPUs / 210 TiB RAM.
  • >20 PiB/day input S3 across >1,600 Ray jobs/day.
  • Average job: >10 TiB input S3, merge, write, full cluster lifecycle <7 minutes.
  • On-time delivery: 100% over Q4 2023 + Q1 2024.
  • Fresh-data SLA: >90% of new updates queryable <60 min after arrival.
  • Cost efficiency: 82% better per GiB vs Spark (→ >220,000 vCPU-years saved/year, ~$120M/yr at on-demand R5 pricing for typical customers).
  • Reliability: Ray 99.15% first-time vs Spark 99.91% (2024 trailing 4-week avg).
  • Memory utilisation: 54.6% of 36 TiB allocated.
  • Target (EoY 2024): >90% efficiency gain at zero manual tuning.
  • Daft-on-Ray follow-on: +24% production cost-efficiency; single-column reads −55% vs PyArrow, −91% vs S3Fs; full-file reads −19% vs PyArrow, −77% vs S3Fs.

Source artefacts

Last updated · 200 distilled / 1,178 read