- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Apache Spark Core—Deep Dive—Proper Optimization (continues)
展开查看详情
1 .WIFI SSID:SparkAISummit | Password: UnifiedAnalytics
2 .Spark Core – Proper Optimization Daniel Tomes, Databricks #UnifiedAnalytics #SparkAISummit
3 .Me • Norman, OK – Undergrad OU – SOONER – Masters – OK State • ConocoPhillips • Raleigh, NC • Cloudera • Databricks /in/tomes #UnifiedAnalytics #SparkAISummit 3
4 .Talking Points • Spark Hierarchy • The Spark UI • Rightsizing & Optimizing • Advanced Optimizations #UnifiedAnalytics #SparkAISummit 4
5 .Spark Hierarchy #UnifiedAnalytics #SparkAISummit 5
6 .Spark Hierarchy • Actions are eager – Made of transformations (lazy) • narrow • wide (requires shuffle) – Spawn jobs • Spawn Stages – Spawn Tasks » Do work & utilize hardware #UnifiedAnalytics #SparkAISummit 6
7 .Navigating The Spark UI DEMO #UnifiedAnalytics #SparkAISummit 7
8 .Understand Your Hardware • Core Count & Speed • Memory Per Core (Working & Storage) • Local Disk Type, Count, Size, & Speed • Network Speed & Topology • Data Lake Properties (rate limits) • Cost / Core / Hour – Financial For Cloud – Opportunity for Shared & On Prem #UnifiedAnalytics #SparkAISummit 8
9 .Get A Baseline Goal • Is your action efficient? – Long Stages, Spills, Laggard Tasks, etc? • CPU Utilization – GANGLIA / YARN / Etc – Tails #UnifiedAnalytics #SparkAISummit 9
10 .Minimize Data Scans (Lazy Load) • Data Skipping – HIVE Partitions – Bucketing • Only Experts – Nearly Impossible to Maintain – Databricks Delta Z-Ordering • What is It • How To Do It #UnifiedAnalytics #SparkAISummit 10
11 .#UnifiedAnalytics #SparkAISummit 11
12 .No Lazy Loading With Lazy Loading Simple Extra Shuffle Partitions #UnifiedAnalytics #SparkAISummit 12
13 .Without Partition Filter Shrink Partition Range Using a Filter on HIVE Partitioned Column With Partition Filter #UnifiedAnalytics #SparkAISummit 13
14 .Partitions – Definition Each of a number of portions into which some operating systems divide memory or storage HIVE PARTITION == SPARK PARTITION #UnifiedAnalytics #SparkAISummit 14
15 .Spark Partitions – Types • Input – Controls - Size • spark.default.parallelism (don’t use) • spark.sql.files.maxPartitionBytes (mutable) – assuming source has sufficient partitions • Shuffle – Control = Count • spark.sql.shuffle.partitions • Output – Control = Size • Coalesce(n) to shrink • Repartition(n) to increase and/or balance (shuffle) • df.write.option(“maxRecordsPerFile”, N) #UnifiedAnalytics #SparkAISummit 15
16 .Partitions – Shuffle – Default Default = 200 Shuffle Partitions #UnifiedAnalytics #SparkAISummit 16
17 .Partitions – Right Sizing – Shuffle – Master Equation • Largest Shuffle Stage – Target Size <= 200 MB/partition • Partition Count = Stage Input Data / Target Size – Solve for Partition Count EXAMPLE Shuffle Stage Input = 210GB x = 210000MB / 200MB = 1050 spark.conf.set(“spark.sql.shuffle.partitions”, 1050) BUT -> If cluster has 2000 cores spark.conf.set(“spark.sql.shuffle.partitions”, 2000) #UnifiedAnalytics #SparkAISummit 17
18 .Cluster Spec Stage 21 -> Shuffle Fed By Stage 19 & 20 96 cores @ 7.625g/core THUS 3.8125g Working Mem Stage 21 Shuffle Input = 45.4g + 8.6g == 54g 3.8125g Storage Mem Default Shuffle Partition == 200 == 54000mb/200parts =~ 270mb/shuffle part Spills #UnifiedAnalytics #SparkAISummit 18
19 .Cluster Spec 96 cores @ 7.625g/core 3.8125g Working Mem 3.8125g Storage Mem 480 shuffle partitions – WHY? Target shuffle part size == 100m p = 54g / 100m == 540 540p / 96 cores == 5.625 NO SPILL 96 * 5 == 480 If p == 540 another 60p have to be loaded and processed after first cycle is complete #UnifiedAnalytics #SparkAISummit 19
20 . Input Partitions – Right Sizing • Use Spark Defaults (128MB) unless… – Increase Parallelism – Heavily Nested/Repetitive Data – Generating Data – i.e. Explode – Source Structure is not optimal (upstream) – UDFs spark.conf.set("spark.sql.files.maxPartitionBytes", 16777216) #UnifiedAnalytics #SparkAISummit 20
21 . 128mb 16mb #UnifiedAnalytics #SparkAISummit 21
22 .#UnifiedAnalytics #SparkAISummit 22
23 . Output Partitions – Right Sizing • Write Once -> Read Many – More Time to Write but Faster to Read • Perfect writes limit parallelism – Compactions (minor & major) Write Data Size = 14.7GB Desired File Size = 1500MB Max write stage parallelism = 10 96 – 10 == 86 cores idle during write #UnifiedAnalytics #SparkAISummit 23
24 .Only 10 Cores Used Average File Size == 1.5g All 96 Cores Used Average File Size == 0.16g #UnifiedAnalytics #SparkAISummit 24
25 .Output Partitions – Composition • df.write.option("maxRecordsPerFile", n) • df.coalesce(n).write… • df.repartition(n).write… • df.repartition(n, [colA, …]).write… • spark.sql.shuffle.partitions(n) • df.localCheckpoint(…).repartition(n).write… • df.localCheckpoint(…).coalesce(n).write… #UnifiedAnalytics #SparkAISummit 25
26 .Partitions – Why So Serious? • Avoid The Spill • Maximize Parallelism – Utilize All Cores – Provision only the cores you need #UnifiedAnalytics #SparkAISummit 26
27 .Advanced Optimizations • Finding Imbalances • Persisting • Join Optimizations • Handling Skew • Expensive Operations • UDFs • Multi-Dimensional Parallelism #UnifiedAnalytics #SparkAISummit 27
28 .Balance Input Partitions • Maximizing Resources Requires Balance Shuffle Partitions – Task Duration Output Files Spills – Partition Size GC Times • SKEW – When some partitions are significantly larger than most Straggling Tasks #UnifiedAnalytics #SparkAISummit 28
29 .75th percentile ~ 2m recs max ~ 45m recs stragglers take > 22X longer IF no spillage With spillage, 100Xs longer #UnifiedAnalytics #SparkAISummit 29