Performance Troubleshooting Using Apache Spark Metrics
Performance troubleshooting of distributed data processing systems is a complex task. Apache Spark comes to rescue with a large set of metrics and instrumentation that you can use to understand and improve the performance of your Spark-based applications. You will learn about the available metric-based instrumentation in Apache Spark: executor task metrics and the Dropwizard-based metrics system. The talk will cover how Hadoop and Spark service at CERN is using Apache Spark metrics for troubleshooting performance and measuring production workloads. Notably, the talk will cover how to deploy a performance dashboard for Spark workloads and will cover the use of sparkMeasure, a tool based on the Spark Listener interface. The speaker will discuss the lessons learned so far and what improvements you can expect in this area in Apache Spark 3.0.
Performance Troubleshooting Using Apache Spark Metrics Luca Canali, CERN
About Luca • Data Engineer at CERN – Hadoop and Spark service, database services – 19+ years of experience with data engineering • Sharing and community – Blog, notes, tools, contributions to Apache Spark @LucaCanaliDB – http://cern.ch/canali
4 .CERN: founded in 1954: 12 European States Science for Peace and Development Today: 23 Member States ~ 2600 staff ~ 1800 other paid personnel ~ 14000 scientific users Budget (2019) ~ 1200 MCHF Member States: Austria, Belgium, Bulgaria, Czech Republic, Denmark, Finland, France, Germany, Greece, Hungary, Israel, Italy, Netherlands, Norway, Poland, Portugal, Romania, Serbia, Slovak Republic, Spain, Sweden, Switzerland and United Kingdom Associate Members in the Pre-Stage to Membership: Cyprus, Slovenia Associate Member States: India, Lithuania, Pakistan, Turkey, Ukraine Applications for Membership or Associate Membership: Brazil, Croatia, Estonia Observers to Council: Japan, Russia, United States of America; European Union, JINR and UNESCO 4
5 .Data at the Large Hadron Collider LHC experiments data: >300 PB Computing jobs on the WLCG Grid: using ~1M cores 5
6 . Analytics Platform @CERN - “Big Data” open source components - Integrated with domain-specific software and existing infrastructure - Users in: Physics, Accelerators, IT Experiments storage HDFS HEP software Personal storage 6
Hadoop and Spark Clusters at CERN • Spark running on clusters: – YARN/Hadoop – Spark on Kubernetes Accelerator logging Hadoop - YARN - 30 nodes (part of LHC infrastructure) (Cores - 1200, Mem - 13 TB, Storage – 7.5 PB) General Purpose Hadoop - YARN, 65 nodes (Cores – 2.2k, Mem – 20 TB, Storage – 12.5 PB) Cloud containers Kubernetes on Openstack VMs, Cores - 250, Mem – 2 TB Storage: remote HDFS or EOS (for physics data)
8 . Text Code Monitoring Sparkmonitor -> Jupyter extension for Spark monitoring, developed as a GSoC project with CERN. https://medium.com/@krishnanr/sp arkmonitor-big-data-tools-for- physics-analysis-bbcdef68b35a Visualizations
Performance Troubleshooting Goals: • Improving productivity • Reducing resource usage and cost • Metrics: latency, throughput, cost How: • Practice and methodologies • Gather performance and workload data
Performance Methodologies and Anti-Patterns Typical benchmark graph Vendor A benchmark – Just a simple measurement TIME (MINUTES) – No root-cause analysis – Guesses and generalization System A is 5x faster! Sound methodologies: http://www.brendangregg.com/methodology.html
11 .Workload and Performance Data • You want data to find answers to questions like – What is my workload doing? – Where is it spending time? – What are the bottlenecks (CPU, I/O)? – How are systems resources used? – Why do I measure the {latency/throughput} that I measure? • Why is not 10x better? #EUdev2 11
Data + Context => Insights Workload monitoring data + Spark architecture knowledge Info on application architecture Application Agent takes produces: insights + actions Info on computing environment
Measuring Spark • Distributed system, parallel architecture – Many components, complexity increases when running at scale – Execution hierarchy: SQL -> Jobs -> Stages -> Tasks – Interaction with clusters and storage
Spark Instrumentation - WebUI WebUI and History server: standard instrumentation • Details on jobs, stages, tasks • Default: http://driver_host:4040 • Details on SQL execution and execution plans • https://github.com/apache/spark/blob/master/docs/web-ui.md
Spark Instrumentation – Metrics Task metrics: • Instrument resource usage by executor tasks: – Time spent executing tasks, – CPU used, I/O metrics, Task – Shuffle read/write details, .. – SPARK-25170: https://spark.apache.org/docs/latest/monitoring.html SQL metrics: • DataFrame/SQL operations. Mostly used by Web UI SQL tab. See SPARK-28935 + Web-UI documentation
How to Gather Spark Task Metrics • Web UI exposes REST API Example: http://localhost:4040/api/v1/applications History server reads from Event Log (JSON file) – spark.eventLog.enabled=true – spark.eventLog.dir = <path> • Programmatic interface via "Spark Listeners" sparkMeasure -> a tool and working example code of how to collect metrics with Spark Listeners
Spark Metrics in REST API …
Task Metrics in the Event Log val df = spark.read.json("/var/log/spark-history/application_1567507314781_..") df.filter("Event='SparkListenerTaskEnd'").select("Task Metrics.*").printSchema |-- Disk Bytes Spilled: long (nullable = true) |-- Executor CPU Time: long (nullable = true) |-- Executor Deserialize CPU Time: long (nullable = true) |-- Executor Deserialize Time: long (nullable = true) |-- Executor Run Time: long (nullable = true) Spark Internal Task metrics: |-- Input Metrics: struct (nullable = true) | |-- Bytes Read: long (nullable = true) Provide info on executors' activity: | |-- Records Read: long (nullable = true) |-- JVM GC Time: long (nullable = true) Run time, CPU time used, I/O metrics, JVM |-- Memory Bytes Spilled: long (nullable = true) |-- Output Metrics: struct (nullable = true) Garbage Collection, Shuffle activity, etc. | |-- Bytes Written: long (nullable = true) | |-- Records Written: long (nullable = true) |-- Result Serialization Time: long (nullable = true) |-- Result Size: long (nullable = true) |-- Shuffle Read Metrics: struct (nullable = true) | |-- Fetch Wait Time: long (nullable = true) | |-- Local Blocks Fetched: long (nullable = true) | |-- Local Bytes Read: long (nullable = true) | |-- Remote Blocks Fetched: long (nullable = true) | |-- Remote Bytes Read: long (nullable = true) | |-- Remote Bytes Read To Disk: long (nullable = true) | |-- Total Records Read: long (nullable = true) |-- Shuffle Write Metrics: struct (nullable = true) | |-- Shuffle Bytes Written: long (nullable = true) | |-- Shuffle Records Written: long (nullable = true) | |-- Shuffle Write Time: long (nullable = true) |-- Updated Blocks: array (nullable = true) | |-- element: string (containsNull = true)
19 .Spark Listeners, @DeveloperApi • Custom class, extends SparkListener • Methods react on events to collect data, example: • Attach custom Lister class to Spark Session --conf spark.extraListeners=.. 19
SparkMeasure Architecture
SparkMeasure – Getting Started • bin/spark-shell --packages ch.cern.sparkmeasure:spark- measure_2.11:0.15 • val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark) • • val myQuery = "select count(*) from range(1000) cross join range(1000) cross join range(1000)" • stageMetrics.runAndMeasure(spark.sql(myQuery).show())
SparkMeasure Output Example • Scheduling mode = FIFO • max(resultSize) => 17934 (17.0 KB) • Spark Context default degree of parallelism = 8 • sum(numUpdatedBlockStatuses) => 0 • Aggregated Spark stage metrics: • sum(diskBytesSpilled) => 0 (0 Bytes) • numStages => 3 • sum(memoryBytesSpilled) => 0 (0 Bytes) • sum(numTasks) => 17 • max(peakExecutionMemory) => 0 • elapsedTime => 9103 (9 s) • sum(recordsRead) => 2000 • sum(stageDuration) => 9027 (9 s) • sum(bytesRead) => 0 (0 Bytes) • sum(executorRunTime) => 69238 (1.2 min) • sum(recordsWritten) => 0 • sum(executorCpuTime) => 68004 (1.1 min) • sum(bytesWritten) => 0 (0 Bytes) • sum(executorDeserializeTime) => 1031 (1 s) • sum(shuffleTotalBytesRead) => 472 (472 Bytes) • sum(executorDeserializeCpuTime) => 151 (0.2 s) • sum(shuffleTotalBlocksFetched) => 8 • sum(resultSerializationTime) => 5 (5 ms) • sum(shuffleLocalBlocksFetched) => 8 • sum(jvmGCTime) => 64 (64 ms) • sum(shuffleRemoteBlocksFetched) => 0 • sum(shuffleFetchWaitTime) => 0 (0 ms) • sum(shuffleBytesWritten) => 472 (472 Bytes) • sum(shuffleWriteTime) => 26 (26 ms) • sum(shuffleRecordsWritten) => 8
SparkMeasure, Usage Modes • Interactive: use from shell or notebooks – Works with Jupyter notebooks, Azure, Colab, Databricks, etc. • Use to instrument your code • Flight recorder mode – No changes needed to the code – For Troubleshooting, for CI/CD pipelines, … • Use with Scala, Python, Java https://github.com/LucaCanali/sparkMeasure
Instrument Code with SparkMeasure https://github.com/LucaCanali/sparkMeasure/blob/master/docs/Instrument_Python_code.md
SparkMeasure on Notebooks: Local Jupyter and Cloud Services https://github.com/LucaCanali/sparkMeasure/tree/master/examples
SparkMeasure on Notebooks: Jupyter Magic: %%sparkmeasure … (note, output truncated to fit in slide
SparkMeasure as Flight Recorder Capture metrics and write to files when finished: Monitoring option: write to InfluxDB on the fly:
Spark Metrics System • Spark is also instrumented using the Dropwizard/Codahale metrics library • Multiple sources (data providers) – Various instrumentation points in Spark code – Including task metrics, scheduler, etc – Instrumentation from the JVM • Multiple sinks – Graphite (InfluxDB), JMX, HTTP, CSV, etc…
Ingredients for a Spark Performance Dashboard • Architecture – Know how the "Dropwizard metrics system" works – Which Spark components are instrumented • Configure backend components – InfluxDB and Grafana • Relevant Spark configuration parameters • Dashboard graphs – familiarize with available metrics – InfluxDB query building for dashboard graphs