- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Making Nested Columns as First Citizen in Apache Spark SQL
展开查看详情
1 .Making Nested Columns as First • Citizens in Apache Spark SQL Cesar Delgado @hpcfarmer DB Tsai @dbtsai Spark+AI SF © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
2 . Siri The world’s most popular intelligent assistant service powering every iPhone, iPad, Mac, Apple TV, Apple Watch, and HomePod © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
3 . Siri Open Source Team • We’re Spark, Hadoop, HBase PMCs / Committers / Contributors • We’re the advocate for Open Source • Pushing our internal changes back to the upstreams • Working with the communities to review pull requests, develop new features and bug fixes © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
4 . Siri Data • Machine learning is used to personalize your experience throughout your day • We believe privacy is a fundamental human right © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
5 . Siri Scale • Large amounts of requests, Data Centers all over the world • Hadoop / Yarn Cluster has thousands of nodes • HDFS has hundred of PB • 100's TB of raw event data per day • More than 90% of jobs are Spark • Less than 10% are legacy Pig and MapReduce jobs © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
6 . Details about our data • Deeply nested relational model data with couple top level columns • The total nested fields are more than 2k • Stored in Parquet format partitioned by UTC day • Most queries are only for a small subset of the data © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
7 . An Example of Hierarchically Organized Table Real estate information can be naturally modeled by case class Address(houseNumber: Int, streetAddress: String, city: String, state: String, zipCode: String) case class Facts(price: Int, size: Int, yearBuilt: Int) case class School(name: String) case class Home(address: Address, facts: Facts, schools: List[School]) © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
8 . Nested SQL Schema sql("select * from homes”).printSchema() root |-- address: struct (nullable = true) | |-- houseNumber: integer (nullable = true) | |-- streetAddress: string (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) | |-- zipCode: string (nullable = true) |-- facts: struct (nullable = true) | |-- price: integer (nullable = true) | |-- size: integer (nullable = true) | |-- yearBuilt: integer (nullable = true) |-- schools: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
9 . Find cities with houses worth more than 2M sql("select address.city from homes where facts.price > 2000000”) .explain(true) == Physical Plan == *(1) Project [address#55.city AS city#75] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56], DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<houseNumber:int,streetAddress:string, city:string,state:string,zipCode:strin…, facts:struct(address:int…)> • We only need two nested columns, address.city and facts.prices • But entire address and facts are read © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
10 . [SPARK-4502], [SPARK-25363] Parquet with Nested Columns • Parquet is a columnar storage format with complex nested data structures in mind • Support very efficient compression and encoding schemes • As a columnar format, each nested column is stored separately as if it's a flattened table • No easy way to cherry pick couple nested columns in Spark • Foundation - Allow reading subset of nested columns right after Parquet FileScan © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
11 . Find cities with houses worth more than 2M • With [SPARK-4502], [SPARK-25363] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Only two nested columns are read! © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
12 . Find cities with houses worth more than 2M • With [SPARK-4502], [SPARK-25363] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Parquet predicate pushdown are not working for nested fields in Spark © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
13 . Find cities with houses worth more than 2M • With [SPARK-25556] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts), GreaterThan(facts.price,2000000)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Predicate Pushdown in Parquet for nested fields provides significant performance gain by eliminating non-matches earlier to read less data and save the cost of processing them © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
14 . Applying an UDF after repartition val areaUdf = udf{ (city: String, state: String, zipCode: String) => s"$city, $state $zipCode" } val query = sql("select * from homes").repartition(1).select( areaUdf(col("address.city"), col("address.state"), col("address.zipCode")) ).explain() == Physical Plan == *(2) Project [UDF(address#58.city, address#58.state, address#58.zipCode) AS UDF(address.city, address.state, address.zipCode)#70] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [address#58] +- *(1) FileScan parquet [address#58] Format: Parquet, ReadSchema: struct<address:struct<houseNumber:int,streetAddress:string, city:string,state:string,zipCode:string>> © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
15 . Problems in Supporting Nested Structures in Spark • Root level columns are represented by Attribute which is base of leaf named expressions • To get a nested field from a root level column, a GetStructField expression with child of Attribute has to be used • All column pruning logics are done in Attribute levels, resulting either the entire root column is taken or pruned • No easy way to cherry pick couple nested columns in this model © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
16 . [SPARK-25603] Generalize Nested Column Pruning • [SPARK-4502], [SPARK-25363] are foundation to support nested structures better with Parquet in Spark • If an operator such as Repartition, Sample, or Limit are applied after Parquet FileScan, nested column pruning will not work • We address this by flattening the nested fields using Alias right after data read © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
17 . Applying an UDF after repartition val query = sql("select * from homes").repartition(1).select( areaUdf(col("address.city"), col("address.state"), col("address.zipCode")) ).explain() == Physical Plan == *(2) Project [UDF(_gen_alias_84#84, _gen_alias_85#85, _gen_alias_86#86) AS UDF(address.city, address.state, address.zipCode)#64] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [address#55.city AS _gen_alias_84#84, address#55.state AS _gen_alias_85#85, address#55.zipCode AS _gen_alias_86#86] +- *(1) FileScan parquet [address#55] ReadSchema: struct<address:struct<city:string,state:string,zipCode:string>> • Nested fields are replaced by Alias with flatten structures • Only three used nested fields are read from Parquet files © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
18 . Production Query - Finding a Needle in a Haystack © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
19 . Spark 2.3.1 1.2h 7.1TB © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
20 . Spark 2.4 with [SPARK-4502], [SPARK-25363], and [SPARK-25556] 1.2h 7.1TB 3.3min 840GB © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
21 . • 21x faster in wall clock time • 8x less data being read • More power efficient © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
22 . Other work • Enhance the Dataset performance by analyzing JVM bytecode and turn closures into Catalyst expressions • Please check our other presentation tomorrow at 11am for more © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
23 . Conclusions With some work, engineering rigor and some optimizations Spark can run at very large scale in lightning speed © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.
24 .• [SPARK-4502] • [SPARK-25363] • [SPARK-25556] • [SPARK-25603]
25 . Thank you © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.