Amazon's Exabyte-Scale Migration from Apache Spark to Ray on Amazon EC2¶
One-paragraph summary¶
Amazon Retail's Business Data Technologies (BDT) team is in the middle of migrating the largest production business-intelligence datasets in Amazon off systems/apache-spark onto systems/ray. The workload is compaction — the copy-on-write merge of change-data-capture (CDC) log streams over exabyte-scale tables in S3 — and the migration is happening on the live pipeline behind Amazon's post-Oracle BI stack. Ray replaces Spark because Ray's programming model (tasks + actors + distributed object store + locality-aware scheduler + autoscaling) let BDT hand-craft a specialist compactor with zero-copy intranode shuffles and reference-based copies of untouched S3 files. Q1 2024 results: 1.5 EiB of input Parquet compacted, 82% better cost efficiency than Spark per GiB, 100% on-time delivery of new compacted data over two quarters, translating to the typical customer value of ~$120M/yr saved on EC2 on-demand R5. Not yet bettering Spark on reliability (first-time success 99.15% vs 99.91%) or cluster memory utilisation (54.6% avg). Flash Compactor contributed back into Ray's open-source systems/deltacat as a step toward the same benefits on systems/apache-iceberg, systems/apache-hudi, and systems/delta-lake.
Background chain (why this was the problem)¶
- 2016 — Amazon was migrating off "what some surmised to be the largest Oracle Data Warehouse on Earth" (>50 PB). Werner tweet cited in the post.
- Architectural answer — the now-canonical concepts/compute-storage-separation: S3 for storage, mixed compute (systems/amazon-redshift, systems/aws-rds, systems/apache-hive on systems/amazon-emr). 50 PB of Oracle table data copied to S3 as delimited text, with each table wrapped in a generic schema based on the systems/amazon-ion type system.
- 2018 — last Oracle Data Warehouse cluster shut down; migration declared complete.
- BDT then built a table subscription service: anyone at Amazon subscribes to an S3-backed table, picks their analytics framework (systems/amazon-athena / Apache Flink / Redshift / Hive / AWS Glue / Spark / …), and either queries on demand or auto-triggers on new data arrival.
- Tables as unbounded streams of S3 files. Each file contains records to insert, update, or delete — a classic CDC log (concepts/change-data-capture). The subscriber's compute framework had to dynamically merge all changes at read time to get current table state.
The problem that motivated the compactor (pre-2019)¶
- CDC logs grew too large to merge in their entirety at read time on the largest clusters.
- Tables developed pathological shapes: millions of KB-scale tiny files or a few TB-scale huge files.
- New subscriptions to the largest tables would take days or weeks to complete a merge, or just fail.
- Fix: run the merge once and write a read-optimised version of the table for subscribers. This is the copy-on-write merge pattern (concepts/copy-on-write-merge) now universal in systems/apache-iceberg and systems/apache-hudi.
- BDT's Spark job that did this was called "the compactor": it compacts a CDC log stream of N deltas across M files down to a single insert delta of K equivalently-sized files.
Two compaction modes called out:
- Append-only compaction — deltas only insert; merge is concatenation; compactor still responsible for sizing output files to optimise reads (big-file split, tiny-file merge).
- Upsert compaction — deltas contain inserts and updates (with a specified merge key); only latest value per key survives.
Why Spark stopped scaling (2019)¶
- Data catalog grew petabyte → exabyte by 2019.
- Compacting all in-scope tables was becoming prohibitively expensive.
- Manual job tuning required to compact their largest tables.
- Jobs exceeded expected completion times.
- Limited options to address perf issues because Spark "successfully (and unfortunately in this case) abstracted away most of the low-level data processing details" — the cost of a general-purpose engine for a specialist workload.
- Considered and rejected an in-house distributed compute service they were already using for table management ops (schema/partition evolution, data repair): right primitives (tasks + actors) but maintainability concerns (complex programming model), high task-invocation overhead, no cluster autoscaling.
Why Ray (the PoC)¶
Came across the Ray paper and met the Berkeley RISELab team.
Ray features named as load-bearing:
- Intuitive tasks + actors API (concepts/task-and-actor-model).
- Horizontally-scalable distributed object store.
- Zero-copy intranode object sharing.
- Efficient locality-aware scheduler (concepts/locality-aware-scheduling).
- Autoscaling clusters.
Initial 2020 PoC on production datasets found Ray could:
- Compact 12× larger datasets than Spark.
- Improve cost efficiency by 91%.
- Process 13× more data per hour.
Contributing factors: reduced task orchestration overhead + reduced GC
overhead + zero-copy shuffles + fine-grained autoscaling. BDT's
biggest lever was flexibility to hand-craft a specialist — e.g.
copying untouched S3 files by reference during the distributed
merge instead of rewriting them
(patterns/reference-based-copy-optimization); linked to DeltaCAT
source:
deltacat/compute/compactor_v2/steps/merge.py#L199-L257.
"BDT didn't need a generalist for the compaction problem, but a specialist, and Ray let them narrow their focus down to optimising the specific problem at hand."
Build-out and migration chain (2021 → 2024)¶
- 2021 — serverless-job-management substrate: Ray on EC2 + systems/dynamodb + systems/aws-sns + systems/aws-sqs + systems/aws-s3 for durable job lifecycle tracking. Presented at Ray Summit 2021. Initial Ray compactor contributed to systems/deltacat (then called Ray DeltaCAT) as a step toward enabling the same on Iceberg / Hudi / Delta Lake. Today systems/anyscale-platform and systems/aws-glue-for-ray mean users don't need to build this from scratch.
- 2022 — first exabyte test-bed: a data-quality service that computed dataset statistics, trends, and anomalies across the whole catalog. Used to shake out Ray issues before placing it on the business-critical path. Main surprises: EC2 resource utilisation + slow cluster start and OOMs.
- OOMs → resolved by memory-aware scheduling (concepts/memory-aware-scheduling), telegraphing expected memory use at task-invocation time from past trends.
- EC2 management → solved by patterns/heterogeneous-cluster-provisioning: discover a set of instance types that can meet expected resource requirements; then provision the most readily available mix across AZs. Trades knowing exactly which instance type a Ray cluster will get for faster provisioning. Ray applications must therefore drop any assumptions about underlying CPU architecture, disk type, or hardware.
- Late 2022 — confidence to start the migration proper. Priority order: the largest ~1% of tables, which accounted for ~40% of total Spark compaction cost and the majority of job failures. Manual shadow compaction on a subset; Ray-based DQ service compared outputs for equivalence.
- Data Quality (DQ) analysis focused on:
- Dataset-level: record counts, cardinalities, min/max/avg.
- File-level: Parquet file format version + feature parity (e.g. if Spark didn't emit a Bloom filter, Ray shouldn't either).
- Explicit choice not to compare byte-for-byte. Why: queries run
on different compute frameworks against TB/PB datasets "almost
never produce equal results at this granularity" — decimal
rounding, non-deterministic execution plans (unstable sorts),
Parquet metadata drift, value overflow handling,
-0vs0equality, timestamp ↔ timezone representation, pre-Gregorian calendar date interpretation, leap seconds. - Additionally: a Data Reconciliation Service that issued real-end-user queries through Redshift / Spark / Athena against both Spark and Ray outputs and compared results (patterns/shadow-migration).
- 2023 — fully automated 1:1 shadow compaction. Every compaction job for a migrating table ran in both frameworks. Purpose: direct comparisons, varied-table coverage, corner-case flushing at scale. Cost explicitly doubled for the duration — "if everything crashed and burned at this point, it would become a capital loss for the business and filed away as a hard lesson learned."
- Subscriber switchover — a service that dynamically moves individual table subscribers from Spark-compacted output to Ray-compacted output one at a time, with per-subscriber rollback (patterns/subscriber-switchover). Alternative considered and rejected: overwrite Spark's output in place and hope.
Results (first quarter 2024)¶
- Volume: 1.5 EiB input Apache Parquet from S3 → 4 EiB corresponding in-memory Apache Arrow. >10,000 years of EC2 vCPU compute time consumed. Cluster scale per job up to 26,846 vCPUs and 210 TiB RAM.
- Throughput: >20 PiB/day input S3 across >1,600 Ray jobs/day. Average Ray compaction job reads >10 TiB input S3, merges, and writes result back in <7 min including cluster setup/teardown.
- Timeliness: 100% on-time delivery of new compacted data to subscribers over Q4 2023 + Q1 2024. >90% of new table updates applied and queryable within 60 min of arrival.
- Cost efficiency: 82% better than Spark per GiB of S3 input compacted (>1.2 EiB Apache Parquet benchmark, equivalent memory-optimised EC2 instance types). Translates to >220,000 years of EC2 vCPU compute time saved annually; in typical-EC2 -customer terms, >$120M/yr in on-demand R5 charges.
Caveats / still-open gaps¶
- Reliability still trails Spark. 2023: Ray's first-time compaction job success rate trailed Spark by up to 15%; 2024 avg 99.15% vs Spark's 99.91% (0.76 pp gap). Ray needs more retries + more manual overhead. Data: >215k job runs over 4-week trailing windows on equivalent memory-optimised EC2 types.
- Cluster memory utilisation. Q1 2024: 19.4 TiB used of 36 TiB allocated (54.6% avg). Memory is the bottleneck, so other resources (CPU, network, disk) are even less utilised. If BDT can get memory to 90%, cost-efficiency improvement vs Spark should rise from 82% → 90%+.
- Ray is still only compacting a minority of the catalog; BDT wants a longer reliability track record before onboarding more.
- Byte-for-byte equivalence is explicitly not promised (see DQ notes above).
What's next (per the post)¶
- New major revision of "The Flash Compactor" taking fuller advantage of Ray pipelining + autoscaling.
- Target end-2024: >90% cost-efficiency improvement vs Spark with no manual tuning / human intervention.
- Joint optimisation with the systems/daft project: improved S3 Parquet + delimited-text I/O on Ray via Daft gave +24% production compaction cost efficiency on top of Ray alone. Daft read benchmarks: median single-column read 55% faster than PyArrow and 91% faster than S3Fs; median full-file read 19% faster than PyArrow, 77% faster than S3Fs.
- Cautionary framing: no blanket recommendation to migrate all Spark to Ray. Spark's feature-rich data-processing abstractions still work well for day-to-day use cases; no automated Spark → Ray transpiler exists. But Exoshuffle holds the 2022 Cloud Terasort cost record — evidence Ray can be a world-class data-processing framework and world-class distributed-ML framework simultaneously, if you have a painful specialist workload worth porting.
Open-source contribution¶
- The Flash Compactor design doc contributed to Ray's systems/deltacat project. Stated goal: extend beyond BDT's own catalog to third-party open table formats (Iceberg / Hudi / Delta Lake).
Key takeaways¶
- Specialist > generalist when the workload pays for it. Spark won on generality; Ray won on specialist optimisation surface. The escape hatch to hand-craft a distributed merge with reference-based file copies and zero-copy intranode shuffles is only worth reaching for when the workload cost justifies bypassing the higher-level abstraction. BDT's ~$120M/yr customer-equivalent saving is what justifies this.
- Compute–storage separation is the enabling platform move. S3 is the durable substrate; Spark, Ray, Redshift, Athena, and Hive all front the same storage. Swapping compute engines is architecturally cheap because the source of truth doesn't move (concepts/compute-storage-separation).
- Copy-on-write compaction is the scale-out answer to read-time-merge CDC streams. Once CDC logs grow beyond the memory of the largest reader cluster, the only scalable answer is to materialise a pre-merged read-optimised version off the hot path (concepts/copy-on-write-merge). This pattern is the structural reason systems/apache-iceberg, systems/apache-hudi, and systems/delta-lake exist.
- Shadow the migration, don't flag-cut it. The canonical shape is run both engines in 1:1 shadow, compare outputs on dataset-level statistics (not byte-for-byte), reconcile real queries across multiple frameworks, then move individual subscribers off the old engine one at a time with the freedom to reverse course (patterns/shadow-migration + patterns/subscriber-switchover).
- Heterogeneous fleets with per-instance-type agnosticism are the operational answer to EC2 capacity variance. Don't pick one instance type up front; pick a set that meets your resource shape and let the provisioner grab whatever is most available. Application-side cost: drop CPU-arch / disk-type / hardware assumptions (patterns/heterogeneous-cluster-provisioning).
- Dataset-statistics equivalence > byte-for-byte equivalence at PB scale. Byte-for-byte comparison of frameworks at this scale is impossible (decimal rounding, unstable sorts, Parquet metadata drift, timestamp/timezone drift, etc). The practical bar is: record counts, cardinalities, min/max/avg match; file-format features match; real-query results across multiple consumer frameworks match.
- Memory is the bottleneck in columnar compaction. BDT's 54.6% average memory utilisation caps their cost-efficiency win below its ceiling. Ray's memory-aware scheduling gives the mechanism to do better; closing the 54 → 90% gap is worth +8 pp of the Ray vs Spark cost-efficiency delta.
- First-time success rate is a real reliability axis that takes years to close. Ray's 2024 99.15% vs Spark's 99.91% is a 0.76 pp gap, but it directly maps to retry cost + manual on-call overhead. Reliability is what you buy with operational time, not with a framework swap.
- Generalist Spark-to-Ray replacement is not yet on offer. No paved automated Spark → Ray translation path; migration requires hand-authoring. Converting an expensive, painful specialist job is viable; converting the whole Spark estate is not.
Operational numbers¶
- Amazon's post-Oracle data footprint: 50 PB → exabyte (petabyte to exabyte between 2016 and 2019).
- Q1 2024 Ray compaction: 1.5 EiB input Parquet → 4 EiB in-memory Arrow.
- >10,000 years of EC2 vCPU compute in Q1 2024 alone.
- Per-cluster scale: up to 26,846 vCPUs / 210 TiB RAM.
- >20 PiB/day input S3 across >1,600 Ray jobs/day.
- Average job: >10 TiB input S3, merge, write, full cluster lifecycle <7 minutes.
- On-time delivery: 100% over Q4 2023 + Q1 2024.
- Fresh-data SLA: >90% of new updates queryable <60 min after arrival.
- Cost efficiency: 82% better per GiB vs Spark (→ >220,000 vCPU-years saved/year, ~$120M/yr at on-demand R5 pricing for typical customers).
- Reliability: Ray 99.15% first-time vs Spark 99.91% (2024 trailing 4-week avg).
- Memory utilisation: 54.6% of 36 TiB allocated.
- Target (EoY 2024): >90% efficiency gain at zero manual tuning.
- Daft-on-Ray follow-on: +24% production cost-efficiency; single-column reads −55% vs PyArrow, −91% vs S3Fs; full-file reads −19% vs PyArrow, −77% vs S3Fs.
Source artefacts¶
- Raw article:
raw/aws/2024-07-29-amazons-exabyte-scale-migration-from-apache-spark-to-ray-on-1b764274.md - Original URL: https://aws.amazon.com/blogs/opensource/amazons-exabyte-scale-migration-from-apache-spark-to-ray-on-amazon-ec2/
- HN thread: https://news.ycombinator.com/item?id=41104288 (291 points)
- Linked primary: Flash Compactor design PDF
- Linked primary: DeltaCAT compactor_v2 merge step (file-by-reference copy)
- Linked primary: 2021 Ray Summit slides, "Petabyte-scale datalake table management with Ray, Arrow, Parquet and S3" (Patrick Ames)
- Linked primary: Ray paper (Moritz et al.), arXiv:1712.05889
- Linked primary: Exoshuffle (Wang et al.), arXiv:2203.05072