- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- <iframe src="https://www.slidestalk.com/Spark/Updates_from_Project_Hydrogen_Unifying_State_of_the_Art_AI?embed" frame border="0" width="640" height="360" scrolling="no" allowfullscreen="true">复制
- 微信扫一扫分享
Updates from Project Hydrogen: Unifying State-of-the-Art AI and Big Data
1 .Updates from Project Hydrogen: Unifying State-of-the-Art AI and Big Data in Apache Spark Xiangrui Meng, Databricks #UnifiedAnalytics #SparkAISummit
2 .About me ● Software Engineer at Databricks ○ machine learning and data science/engineering ● Committer and PMC member of Apache Spark ○ MLlib, SparkR, PySpark, Spark Packages, etc 2
3 .About Project Hydrogen Announced last June, Project Hydrogen is a major Spark initiative to unify state-of-the-art AI and big data workloads. Barrier Optimized Accelerator Execution Data Aware Mode Exchange Scheduling 3
4 .Why Spark + AI? 4
5 . Apache Spark: The First Unified Analytics Engine Runtime Delta Spark Core Engine Big Data Processing Machine Learning ETL + SQL +Streaming MLlib + SparkR 5
6 .AI is re-shaping the world Huge disruptive innovations are affecting most enterprises on the planet Healthcare and Genomics Fraud Prevention Digital Personalization Internet of Things and many more... 6
7 .Better AI needs more data 7
8 .When AI goes distributed ... When datasets get bigger and bigger, we see more and more distributed training scenarios and open-source offerings, e.g., distributed TensorFlow, Horovod, and distributed MXNet. This is where Spark and AI cross. 8
9 .Why Project Hydrogen? 9
10 .Two simple stories As a data scientist, I can: ● build a pipeline that fetches training events from a production data warehouse and trains a DL model in parallel; ● apply a trained DL model to a distributed stream of events and enrich it with predicted labels. 10
11 .Distributed training load fit model data warehouse Required: Be able to read from Required: distributed GPU cluster Databricks Delta, Parquet, for fast training MySQL, Hive, etc. Answer: Horovod, Distributed Answer: Apache Spark Tensorflow, etc 11
12 .Two separate data and AI clusters? load using a Spark fit on a GPU model cluster save data cluster required: glue code 12
13 .Streaming model inference load predict model Kafka required: ● save to stream sink ● GPU for fast inference 13
14 .A hybrid Spark and AI cluster? fit a model load using a Spark distributedly model cluster w/ GPUs on the same cluster load using a Spark predict w/ GPUs as model cluster w/ GPUs a Spark task 14
15 .Unfortunately, it doesn’t work out of the box. See a previous demo.
16 .Project Hydrogen to fill the major gaps Barrier Optimized Accelerator Execution Data Aware Mode Exchange Scheduling 16
17 .Updates from Project Hydrogen As a Spark contributor, I want to present: ● what features from Project Hydrogen are available, ● what features are in development. As a Databricks engineer, I want to share: ● how we utilized features from Project Hydrogen, ● lessons learned and best practices. 17
18 . Story #1: Distributed training fit a model load using a Spark distributedly model cluster w/ GPUs on the same cluster 18
19 .Project Hydrogen: barrier execution mode Barrier Optimized Accelerator Execution Data Aware Mode Exchange Scheduling 19
20 .Different execution models Task 1 Spark (MapReduce) Task 2 Tasks are independent of each other Task 3 Embarrassingly parallel & massively scalable Task 1 Distributed training Complete coordination among tasks Optimized for communication Task 2 Task 3 20
21 .Barrier execution mode We introduced gang scheduling to Spark on top of MapReduce execution model. So a distributed DL job can run as a Spark job. ● It starts all tasks together. ● It provides sufficient info and tooling to run a hybrid distributed job. ● It cancels and restarts all tasks in case of failures. JIRA: SPARK-24374 (Spark 2.4) 21
22 .API: RDD.barrier() RDD.barrier() tells Spark to launch the tasks together. rdd.barrier().mapPartitions { iter => val context = BarrierTaskContext.get() ... } 22
23 .API: context.barrier() context.barrier() places a global barrier and waits until all tasks in this stage hit this barrier. val context = BarrierTaskContext.get() … // preparation context.barrier() 23
24 .API: context.getTaskInfos() context.getTaskInfos() returns info about all tasks in this stage. if (context.partitionId == 0) { val addrs = context.getTaskInfos().map(_.address) ... // start a hybrid training job, e.g., via MPI } context.barrier() // wait until training finishes 24
25 .Barrier mode integration 25
26 .Horovod (an LF AI hosted project) Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and MXNet. It is originally developed at Uber, now an LF AI hosted project at Linux Foundation. ● Little modification to single-node code. ● High-performance I/O via MPI and NCCL. ● Same convergence theory. Some limitation: ● Before v0.16, user still needs to use mpirun to launch a job, ● … with a python training script: mpirun -np 16 -H server1:4,server2:4,server3:4,server4:4 -bind-to none -map-by slot -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH -mca pml ob1 -mca btl ^openib python train.py 26
27 .Hydrogen integration with Horovod Databricks released HorovodRunner w/ Runtime 5.0 ML built on top of Horovod and Project Hydrogen. ● Runs Horovod under barrier execution mode. ● Hides cluster setup, scripts, MPI command line from users. def train_hvd(): hvd.init() … # train using Horovod HorovodRunner(np=2).run(train_hvd) 27
28 .Implementation of HorovodRunner Integrating Horovod with barrier mode is straightforward: ● Pickle and broadcast the train function. ○ Inspect code and warn users about potential issues. ● Launch a Spark job in barrier execution mode. ● In the first executor, use worker addresses to launch the Horovod MPI job. ● Terminate Horovod if the Spark job got cancelled. ○ Hint: PR_SET_PDEATHSIG Limitation: ● Tailored for Databricks Runtime ML ○ Horovod built with TensorFlow/PyTorch, SSH, OpenMPI, NCCL, etc. ○ Spark 2.4, GPU cluster configuration, etc. 28
29 .horovod.spark horovod.spark is a new feature in Horovod 0.16 release. Similar to HorovodRunner, it runs Horovod as a Spark job and takes python train functions. Its assumption is more general: ● no dependency on SSH, ● system-independent process termination, ● multiple Spark versions, ● and more … also check out horovodrun:) 29