- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
21 Parallel DB #1
展开查看详情
1 . Today’s Papers EECS 262a • A Comparison of Approaches to Large-Scale Data Analysis Advanced Topics in Computer Systems Andrew Pavlo, Erik Paulson, Alexander Rasin, Daniel J. Abadi, David J. Lecture 21 DeWitt, Samuel Madden, Michael Stonebraker. Appears in Proceedings of the ACM SIGMOD International Conference on Management of Data, 2009 • Spark: Cluster Computing with Working Sets M. Zaharia, M. Chowdhury, M.J. Franklin, S. Shenker and I. Stoica. Appears Comparison of Parallel DB, CS, MR in Proceedings of HotCloud 2010, June 2010. and Spark – M. Zaharia, et al, Resilient Distributed Datasets: A fault-tolerant abstraction for in- memory cluster computing, NSDI 2012. November 11th, 2018 • Thoughts? John Kubiatowicz Electrical Engineering and Computer Sciences University of California, Berkeley http://www.eecs.berkeley.edu/~kubitron/cs262 11/06/2018 Cs262a-F18 Lecture-21 2 Typical Web Application Example: Facebook Lexicon Reporting Web Storage Spam detection Servers transactions (e.g. MySQL) (e.g. Apache) Ad targeting snapshots Use of external data logs (e.g. Google) updates Archival & Ad-hoc queries about site usage Analytics (e.g. MapReduce) … www.facebook.com/lexicon (now defunct) 11/06/2018 Cs262a-F18 Lecture-21 3 11/06/2018 Cs262a-F18 Lecture-21 4
2 . Typical Hadoop (MapReduce) Cluster Challenges Aggregation switch • Cheap nodes fail, especially when you have many – Mean time between failures for 1 node = 3 years Rack switch – MTBF for 1000 nodes = 1 day – Implication: Applications must tolerate faults • Commodity network = low bandwidth – Implication: Push computation to the data • 40 nodes/rack, 1000-4000 nodes in cluster • Nodes can also “fail” by going slowly (execution • 1 Gbps bandwidth in rack, 8 Gbps out of rack skew) – Implication: Application must tolerate & avoid • Node specs (Facebook): stragglers 8-16 cores, 32 GB RAM, 8×1.5 TB disks, no RAID 11/06/2018 Cs262a-F18 Lecture-21 5 11/06/2018 Cs262a-F18 Lecture-21 6 MapReduce MapReduce Programming Model • First widely popular programming model for data-intensive • Data type: key-value records apps on commodity clusters • Map function: • Published by Google in 2004 (Kin, Vin) list(Kinter, Vinter) – Processes 20 PB of data / day • Reduce function: • Popularized by open-source Hadoop project – 40,000 nodes at Yahoo!, 70 PB at Facebook (Kinter, list(Vinter)) list(Kout, Vout) 11/06/2018 Cs262a-F18 Lecture-21 7 11/06/2018 Cs262a-F18 Lecture-21 8
3 . Word Count Execution MapReduce Execution Details Input Map Shuffle & Sort Reduce Output • Mappers preferentially scheduled on same node or same the, 1 rack as their input block the quick brown, 1 – Minimize network use to improve performance brown, 2 brown fox Map fox, 1 fox, 2 Reduce how, 1 • Mappers save outputs to local disk before serving to the, 1 fox, 1 now, 1 reducers the fox ate the, 1 the, 3 – Allows recovery if a reducer crashes the mouse Map quick, 1 how, 1 ate, 1 now, 1 ate, 1 cow, 1 brown, 1 mouse, 1 Reduce how now mouse, 1 brown Map cow, 1 quick, 1 cow 11/06/2018 Cs262a-F18 Lecture-21 9 11/06/2018 Cs262a-F18 Lecture-21 10 Fault Tolerance in MapReduce Fault Tolerance in MapReduce 1. If a task crashes: 2. If a node crashes: – Retry on another node – Relaunch its current tasks on other nodes » OK for a map because it had no dependencies – Relaunch any maps the node previously ran » OK for reduce because map outputs are on disk » Necessary because their output files are lost – If the same task repeatedly fails, fail the job Note: For fault tolerance to work, tasks must be deterministic and side-effect-free 11/06/2018 Cs262a-F18 Lecture-21 11 11/06/2018 Cs262a-F18 Lecture-21 12
4 . Fault Tolerance in MapReduce Takeaways 3. If a task is going slowly – straggler (execution skew): • By providing a data-parallel programming model, – Launch second copy of task on another node MapReduce can control job execution in useful ways: – Take output of whichever copy finishes first – Automatic division of job into tasks – Placement of computation near data – Load balancing • Critical for performance in large clusters – Recovery from failures & stragglers 11/06/2018 Cs262a-F18 Lecture-21 13 11/06/2018 Cs262a-F18 Lecture-21 14 Two Approaches to Large-Scale Data Analysis • “Shared nothing” • MapReduce – Distributed file system – Map, Split, Copy, Reduce – MR scheduler Comparisons between • Parallel DBMS – Standard relational tables, (physical location transparent) Parallel DBMS and MapReduce – Data are partitioned over cluster nodes – SQL – Join processing: T1 joins T2 » If T2 is small, copy T2 to all the machines » If T2 is large, then hash partition T1 and T2 and send partitions to different machines (this is similar to the split-copy in MapReduce) – Query Optimization – Intermediate tables not materialized by default 11/06/2018 Cs262a-F18 Lecture-21 15 11/06/2018 Cs262a-F18 Lecture-21 16
5 . Architectural Differences Schema Support Parallel DBMS MapReduce • MapReduce • Parallel DBMS – Flexible, programmers – Relational schema Schema Support O X write code to interpret required input data – Good if data are shared Indexing O X – Good for single by multiple applications application scenario Stating what you want Presenting an algorithm Programming Model – Bad if data are shared (SQL) (C/C++, Java, …) by multiple Optimization O X applications. Must address data syntax, Flexibility Spotty UDF Support Good consistency, etc. Fault Tolerance Not as Good Good Node Scalability <100 >10,000 11/06/2018 Cs262a-F18 Lecture-21 17 11/06/2018 Cs262a-F18 Lecture-21 18 Programming Model & Flexibility Indexing • MapReduce • Parallel DBMS • MapReduce • Parallel DBMS – Low level: “We argue – SQL – No native index support – Hash/b-tree indexes that MR programming – user-defined functions, – Programmers can well supported is somewhat analogous stored procedures, implement their own to Codasyl user-defined index support in programming…” aggregates Map/Reduce code – “Anecdotal evidence – But hard to share the from the MR customized indexes in community suggests multiple applications that there is widespread sharing of MR code fragments to do common tasks, such as joining data sets.” – very flexible 11/06/2018 Cs262a-F18 Lecture-21 19 11/06/2018 Cs262a-F18 Lecture-21 20
6 . Execution Strategy & Fault Tolerance Avoiding Data Transfers • MapReduce • Parallel DBMS • MapReduce • Parallel DBMS – Intermediate results are – Intermediate results are – Schedule Map close to – A lot of optimizations saved to local files pushed across network data – Such as determine – If a node fails, run the – If a node fails, must re- – But other than this, where to perform node-task again on run the entire query programmers must filtering another node avoid data transfers – At a mapper machine, themselves when multiple reducers are reading multiple local files, there could be large numbers of disk seeks, leading to poor performance. 11/06/2018 Cs262a-F18 Lecture-21 21 11/06/2018 Cs262a-F18 Lecture-21 22 Node Scalability Performance Benchmarks • MapReduce • Parallel DBMS • Benchmark Environment – 10,000’s of commodity – <100 expensive nodes • Original MR task (Grep) nodes – Petabytes of data – 10’s of Petabytes of • Analytical Tasks data – Selection – Aggregation – Join – User-defined-function (UDF) aggregation 11/06/2018 Cs262a-F18 Lecture-21 23 11/06/2018 Cs262a-F18 Lecture-21 24
7 . Node Configuration Tested Systems • 100-node cluster • Hadoop (0.19.0 on Java 1.6.0) – HDFS data block size: 256MB – Each node: 2.40GHz Intel Core 2 Duo, 64-bit red hat – JVMs use 3.5GB heap size per node enterprise Linux 5 (kernel 2.6.18) w/ 4Gb RAM and two 250GB – “Rack awareness” enabled for data locality SATA HDDs. – Three replicas w/o compression: Compression or fewer replicas in • Nodes interconnected with Cisco Catalyst HDFS does not improve performance 3750E 1Gb/s switches • DBMS-X (a parallel SQL DBMS from a major vendor) – Internal switching fabric has 128Gbps – Row store – 50 nodes per switch – 4GB shared memory for buffer pool and temp space per node – Compressed table (compression often reduces time by 50%) • Multiple switches interconnected via 64Gbps Cisco StackWise ring • Vertica – The ring is only used for cross-switch communications. – Column store – 256MB buffer size per node – Compressed columns by default 11/06/2018 Cs262a-F18 Lecture-21 25 11/06/2018 Cs262a-F18 Lecture-21 26 Benchmark Execution Performance Benchmarks • Data loading time: • Benchmark Environment – Actual loading of the data • Original MR task (Grep) – Additional operations after the loading, such as compressing or building indexes • Analytical Tasks – Selection • Execution time – Aggregation – DBMS-X and vertica: – Join » Final results are piped from a shell command into a file – User-defined-function (UDF) aggregation – Hadoop: » Final results are stored in HDFS » An additional Reduce job step to combine the multiple files into a single file 11/06/2018 Cs262a-F18 Lecture-21 27 11/06/2018 Cs262a-F18 Lecture-21 28
8 . Task Description Data Loading • From MapReduce paper • Hadoop: – Input data set: 100-byte records – Copy text files into HDFS in parallel – Look for a three-character pattern – One match per 10,000 records • DBMS-X: • Varying the number of nodes – Load SQL command executed in parallel: it performs hash – Fix the size of data per node (535MB/node) partitioning and distributes records to multiple machines – Fix the total data size (1TB) – Reorganize data on each node: compress data, build index, perform other housekeeping » This happens in parallel • Vertica: – Copy command to load data in parallel – Data is re-distributed, then compressed 11/06/2018 Cs262a-F18 Lecture-21 29 11/06/2018 Cs262a-F18 Lecture-21 30 Data Loading Times Execution • SQL: – SELECT * FROM data WHERE field LIKE “%XYZ%” – Full table scan • MapReduce: – Map: pattern search – No reduce – An additional Reduce job to combine the output into a single file • DBMS-X: grey is loading, white is re-organization after loading – Loading is actually sequential despite parallel load commands • Hadoop does better because it only copies the data to three HDFS replicas 11/06/2018 Cs262a-F18 Lecture-21 31 11/06/2018 Cs262a-F18 Lecture-21 32
9 . Execution time Performance Benchmarks Combine output • Benchmark Environment • Original MR task (Grep) grep • Analytical Tasks – Selection – Aggregation – Join – User-defined-function (UDF) aggregation • Hadoop’s large start-up cost shows up in Figure 4, when data per node is small • Vertica’s good data compression 11/06/2018 Cs262a-F18 Lecture-21 33 11/06/2018 Cs262a-F18 Lecture-21 34 Input Data Selection Task • Input #1: random HTML documents • Find the pageURLs in the rankings table (1GB/node) – Inside an html doc, links are generated with Zipfian with a pageRank > threshold distribution – 36,000 records per data file (very selective) – 600,000 unique html docs with unique urls per node • SQL: • Input #2: 155 million UserVisits records SELECT pageURL, pageRank – 20GB/node FROM Rankings WHERE pageRank > X; • Input #3: 18 million Ranking records • MR: single Map, no Reduce – 1GB/node 11/06/2018 Cs262a-F18 Lecture-21 35 11/06/2018 Cs262a-F18 Lecture-21 36
10 . Selection Task Aggregation Task • Calculate the total adRevenue generated for each sourceIP in the UserVisits table (20GB/node), grouped by the sourceIP column. – Nodes must exchange info for computing groupby – Generate 53 MB data regardless of number of nodes • SQL: SELECT sourceIP, SUM(adRevenue) FROM UserVisits GROUP BY sourceIP; • MR: – Map: outputs (sourceIP, adRevenue) – Reduce: compute sum per sourceIP – “Combine” is used • Hadoop’s start-up cost; DBMS uses index; vertica’s reliable message layer becomes bottleneck 11/06/2018 Cs262a-F18 Lecture-21 37 11/06/2018 Cs262a-F18 Lecture-21 38 Aggregation Task Join Task • Find the sourceIP that generated the most revenue within Jan 15-22, 2000, then calculate the average pageRank of all the pages visited by the sourceIP during this interval • SQL: SELECT INTO Temp sourceIP, AVG(pageRank) as avgPageRank, SUM(adRevenue) as totalRevenue FROM Rankings AS R, UserVisits AS UV WHERE R.pageURL = UV.destURL AND UV.visitDate BETWEEN Date(‘2000-01-15’) AND Date(‘2000-01-22’) GROUP BY UV.sourceIP; SELECT sourceIP, totalRevenue, avgPageRank FROM Temp ORDER BY totalRevenue DESC LIMIT 1; • DBMS: Local group-by, then the coordinator performs the global group-by; performance dominated by data transfer. 11/06/2018 Cs262a-F18 Lecture-21 39 11/06/2018 Cs262a-F18 Lecture-21 40
11 . Map Reduce Join Task • Phase 1: filter UserVisits that are outside the desired date range, joins the qualifying records with records from the Ranking file • Phase 2: compute total adRevenue and average pageRank per sourceIP • Phase 3: produce the largest record • DBMS can use index, both relations are partitioned on the join key; MR has to read all data • MR phase 1 takes an average 1434.7 seconds – 600 seconds of raw I/O to read the table; 300 seconds to split, parse, deserialize; Thus CPU overhead is the limiting factor 11/06/2018 Cs262a-F18 Lecture-21 41 11/06/2018 Cs262a-F18 Lecture-21 42 UDF Aggregation Task UDF Aggregation • Compute inlink count per document • SQL: SELECT INTO Temp F(contents) FROM Documents; SELECT url, SUM(value) FROM Temp GROUP BY url; Need a user-defined-function to parse HTML docs (C pgm using POSIX regex lib) Both DBMS’s do not support UDF very well, requiring separate program using local disk and bulk loading of the DBMS – why was MR always forced to use Reduce to combine results? • MR: – A standard MR program • DBMS: lower – UDF time; upper – other query time • Hadoop: lower – query time; upper: combine all results into one 11/06/2018 Cs262a-F18 Lecture-21 43 11/06/2018 Cs262a-F18 Lecture-21 44
12 . Discussion Alternative: HadoopDB? • Throughput experiments? • The Basic Idea (An Architectural Hybrid of MR & DBMS) – To use MR as the communication layer above multiple nodes running • Parallel DBMSs are much more challenging than Hadoop to install and configure properly – DBMSs require professional DBAs to single-node DBMS instances configure/tune • Queries expressed in SQL, • Alternatives: Shark (Hive on Spark) translated into MR by – Eliminates Hadoop task start-up cost and answers queries with sub-second latencies extending existing tools » 100 node system: 10 second till the first task starts, 25 seconds till all nodes run tasks – Columnar memory store (multiple orders of magnitude faster than disk – As much work as possible is pushed into the higher performing single node databases| • Compression: does not help in Hadoop? – An artifact of Hadoop’s Java-based implementation? • How many of complaints from Comparison paper • Execution strategy (DBMS), failure model (Hadoop), ease of use (H/D) still apply here? • Other alternatives? Apache Hive, Impala (Cloudera) , HadoopDB • Hadapt startup (Hadapt), … commercializing 11/06/2018 Cs262a-F18 Lecture-21 45 11/06/2018 Cs262a-F18 Lecture-21 46 Is this a good paper? • What were the authors’ goals? • What about the evaluation/metrics? • Did they convince you that this was a good system/approach? • Were there any red-flags? • What mistakes did they make? • Does the system/approach meet the “Test of Time” challenge? • How would you review this paper today? BREAK 11/06/2018 Cs262a-F18 Lecture-21 47 11/06/2018 Cs262a-F18 Lecture-21 48
13 . Issues with MapReduce Issues with MapReduce • Hard to express more complex programs • Acyclic data flow from stable storage to stable – E.g. word count + a sort to find the top words storage poor support for applications that need to – Need to write many different map and reduce functions that are split reuse pieces of data (I/O bottleneck and compute up all over the program overhead) – Must write complex operators (e.g. join) by hand – Iterative algorithms (e.g. machine learning, graphs) – Interactive data mining (e.g. Matlab, Python, SQL) – Stream processing (e.g., continuous data mining) Map Reduce Input Map Output Reduce Query 1 Stage 1 Stage 2 Stage 3 Map Job 1 Job 2 Query 2 … Query 3 Iterative job Interactive mining Stream processing 11/06/2018 Cs262a-F18 Lecture-21 49 11/06/2018 Cs262a-F18 Lecture-21 50 Example: Iterative Apps Goal: Keep Working Set in RAM iteration 1 result 1 iteration 1 one-time iteration 2 result 2 processing iteration 2 iteration 3 result 3 iteration 3 Input Input Distributed . . . memory . . . iter. 1 iter. 2 . . iter. 1 iter. 2 . . . . Input Input 11/06/2018 Cs262a-F18 Lecture-21 51 11/06/2018 Cs262a-F18 Lecture-21 52
14 . Key Idea: Resilient Distributed Spark Goals Datasets (RDDs) • Support iterative and stream jobs (apps with data reuse) • Restricted form of distributed shared memory efficiently: – Read-only (immutable), partitioned collections of records – Let them keep data in memory – Caching hint tells system to retain RDD in memory – Can only be created through deterministic transformations (map, group-by, join, …) • Experiment with programmability – Leverage Scala to integrate cleanly into programs • Allows efficient implementation & recovery – Support interactive use from Scala interpreter – Key idea: rebuild lost data using lineage – Enables hint model that allows memory to be reused if necessary – No cost if nothing fails • Retain MapReduce’s fine-grained fault-tolerance and automatic scheduling benefits of MapReduce • Rich enough to capture many models: – Data flow models: MapReduce, Dryad, SQL, … – Specialized models for iterative apps: Pregel, Hama, … 11/06/2018 Cs262a-F18 Lecture-21 53 11/06/2018 Cs262a-F18 Lecture-21 54 RDD Recovery Programming Model • Driver program – Implements high-level control flow of an application iteration 1 – Launches various operations in parallel one-time • Resilient distributed datasets (RDDs) processing iteration 2 – Immutable, partitioned collections of objects – Created through parallel transformations (map, filter, groupBy, join, …) on data in stable storage iteration 3 Input Distributed – Can be cached for efficient reuse memory • Parallel actions on RDDs . . . – Foreach, reduce, collect • Shared variables – Accumulators (add-only), Broadcast variables (read-only) iter. 1 iter. 2 . . . Input 11/06/2018 Cs262a-F18 Lecture-21 55 11/06/2018 Cs262a-F18 Lecture-21 56
15 . Parallel Operations Shared Variables • reduce – Combines dataset elements using an • Broadcast variables associative function to produce a result at the – Used for large read-only data (e.g., lookup table) in multiple driver program parallel operations – distributed once instead of packaging with every closure • collect – Sends all elements of the dataset to the driver program (e.g., update an array in parallel • Accumulators with parallelize, map, and collect) – Variables that works can only “add” to using an associative operation, and only the driver program can read • foreach – Passes each element through a user provided function • No grouped reduce operation 11/06/2018 Cs262a-F18 Lecture-21 57 11/06/2018 Cs262a-F18 Lecture-21 58 RDD Fault Tolerance Architecture RDDs maintain lineage information that can be used to local cache reconstruct lost partitions Driver program connects to Mesos and Ex: schedules tasks user code, HDFS cachedMsgs = textFile(...).filter(_.contains(“error”)) broadcast Workers run tasks, vars .map(_.split(‘\t’)(2)) report results and .cache() variable updates Mesos Data shared with HDFS/NFS Driver tasks, HdfsRDD FilteredRDD MappedRDD results path: hdfs://… func: contains(...) func: split(…) CachedRDD No communication between workers for now Workers 11/06/2018 Cs262a-F18 Lecture-21 59 11/06/2018 Cs262a-F18 Lecture-21 60
16 . Spark Version of Word Count Spark Version of Log Mining file = spark.textFile("hdfs://...") Load error messages from a log into memory, file.flatMap(line => line.split(" ")) then interactively search for various patterns BaseTransformed RDD Cache 1 .map(word => (word, 1)) lines = spark.textFile(“hdfs://...”) RDD Worker results .reduceByKey(_ + _) errors = lines.filter(_.startsWith(“ERROR”)) messages = errors.map(_.split(‘\t’)(2)) tasks Block 1 Driver cachedMsgs = messages.cache() Action cachedMsgs.filter(_.contains(“foo”)).count cachedMsgs.filter(_.contains(“bar”)).count Cache 2 Worker . . . Cache 3 Worker Block 2 Result: Result: full-text scaled search to 1 TB of Wikipedia data in 5-7insec <1 sec (vs (vs170 20 sec secfor foron-disk on-diskdata) data) Block 3 11/06/2018 Cs262a-F18 Lecture-21 61 11/06/2018 Cs262a-F18 Lecture-21 62 Logistic Regression Example: Logistic Regression Goal: find best line separating two sets of points val data = spark.textFile(...).map(readPoint).cache() var w = Vector.random(D) random initial line for (i <- 1 to ITERATIONS) { val gradient = data.map(p => (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _) w -= gradient } println("Final w: " + w) target 11/06/2018 Cs262a-F18 Lecture-21 63 11/06/2018 Cs262a-F18 Lecture-21 64
17 . Job Execution Job Execution update update Master param param param aggregate aggregate Master Master Map 1 Map 2 Map 3 Map 4 param param aggregate Worker Worker Worker Worker Worker Worker Worker Worker Reduce #1 #2 #3 #4 #1 #2 #3 #4 param Big Dataset Map 5 Map 6 Map 7 Map 8 R1 R2 R3 R4 R1 R2 R3 R4 aggregate Reduce Spark Spark Hadoop / Dryad 11/06/2018 Cs262a-F18 Lecture-21 65 11/06/2018 Cs262a-F18 Lecture-21 66 Performance Interactive Spark 4500 Modified Scala interpreter to allow Spark to be used 4000 interactively from the command line 127 s / iteration Running Time (s) 3500 Required two changes: 3000 – Modified wrapper code generation so that each “line” typed has 2500 Hadoop references to objects for its dependencies 2000 – Place generated classes in distributed filesystem Spark 1500 Enables in-memory exploration of big data 1000 500 first iteration 174 0 s 1 5 10 20 30 Number of Iterations further iterations 6s 11/06/2018 Cs262a-F18 Lecture-21 67 11/06/2018 Cs262a-F18 Lecture-21 68
18 . What RDDs are Not Good For Milestones • RDDs work best when an application applies the same operation to many data records – Our approach is to just log the operation, not the data • 2010: Spark open sourced • Feb 2013: Spark Streaming alpha open sourced • Will not work well for apps where processes • Jun 2013: Spark entered Apache Incubator asynchronously update shared state – Storage system for a web application • Aug 2013: Machine Learning library for Spark – Parallel web crawler – Incremental web indexer (e.g. Google’s Percolator) 11/06/2018 Cs262a-F18 Lecture-21 69 11/06/2018 Cs262a-F18 Lecture-21 70 Frameworks Built on Spark • MapReduce • HaLoop – Iterative MapReduce from UC Irvine / U Washington • Pregel on Spark (Bagel) – Graph processing framework from Google based on BSP message-passing model • Hive on Spark (Shark) – In progress 11/06/2018 Cs262a-F18 Lecture-21 71 11/06/2018 Cs262a-F18 Lecture-21 72
19 . Summary Is this a good paper? • Spark makes distributed datasets a first-class primitive to • What were the authors’ goals? support a wide range of apps • What about the evaluation/metrics? • Did they convince you that this was a good • RDDs enable efficient recovery through lineage, caching, system/approach? controlled partitioning, and debugging • Were there any red-flags? • What mistakes did they make? • Does the system/approach meet the “Test of Time” challenge? • How would you review this paper today? 11/06/2018 Cs262a-F18 Lecture-21 73 11/06/2018 Cs262a-F18 Lecture-21 74