- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Improving Apache Spark’s Reliability with DataSourceV2
展开查看详情
1 .Improving Spark’s Reliability with DataSourceV2 Ryan Blue Spark Summit 2019
2 .Data at Netflix
3 . Cloud-native data warehouse ● YARN compute clusters are expendable ● Expendable clusters require architectural changes ○ GENIE is a job submission service that selects the cluster ○ METACAT is a cluster-independent metastore ○ S3 is the source of truth for data
4 . S3 is eventually consistent ● File list calls may be inaccurate ● Hive tables rely on accurate listing for correctness ● S3 queries may be incorrect, sometimes
5 . S3 is eventually consistent ● File list calls may be inaccurate ● Hive tables rely on accurate listing for correctness ● S3 queries may be incorrect, sometimes
6 .At Netflix’s scale, sometimes is every day.
7 . A reliable S3 warehouse (in 2016) ● Requires consistent listing – S3MPER ● Requires in-place writes – BATCH PATTERN ● Requires atomic metastore changes – METACAT
8 . Changes needed in Spark ● Integrate S3 batch pattern committers ● Spark versions ○ 1.6 – Hive path only ○ 2.0 – DataSource path for reads, not writes ○ 2.1+ – Use DataSource path for reads and writes
9 .Problems and Roadblocks
10 . DataFrameWriter ● Behavior is not defined ● What do save and saveAsTable do differently? ○ Create different logical plans . . . that are converted to other logical plans ● When you use “overwrite” mode, what happens? ○ Depends on the data source
11 . SaveMode ● Delegates behavior to the source when tables don’t exist ● Overwrite might mean: ○ Replace table – data and metadata (Some code paths) ○ Replace all table data (Some code paths) ○ Replace static partitions (DataSource tables) ○ Replace dynamic partitions (Hive tables, SPARK-20236)
12 . Validation ● What is “correct” for CTAS/overwrite when the table exists? ● PreprocessTableCreation vs PreprocessTableInsertion ○ Depends on the DataFrameWriter call ● Spark automatically inserts unsafe casts (e.g. string to int) ● Path tables have no schema validation on write
13 .“[These] should do the same thing, but as we've already published these 2 interfaces and the implementations may have different logic, we have to keep these 2 different commands.”
14 .“[These] should do the same thing, but as we've already published these 2 interfaces and the implementations may have different logic, we have to keep these 2 different commands.” 😕
15 . Commands ● RunnableCommand wraps a logical in a pseudo-physical plan ● Commands created inside run made it worse
16 . Community Roadblocks ● Substantial behavior changes for 2.0 ○ Committed with no time to review . . . to the 2.0 release branch ● Behavior not up for discussion ● Parts of PRs merged without attribution
17 .Iceberg and DataSourceV2
18 . A reliable S3 warehouse (in 2019) ● Iceberg: tables without unpleasant surprises ● Fix tables, not the file system ● While fixing reliability and scale, fix usability: ○ Reliable schema evolution ○ Automatic partitioning ○ Configure tables, not jobs
19 . Last year ● Need a way to plug in Iceberg cleanly ● Maintaining a separate write path takes time ● Spark’s write path had solidified ● DataSourceV2 was proposed . . .
20 . Why DataSourceV2? ● Isn’t v2 just an update to the read/write API? ● Existing design problems also affect v2 ○ No write validation – yet another logical plan ○ SaveMode passed to sources ● Opportunity: avoid needing v3 to fix behavior
21 . What’s different in DSv2 ● Define a set of common logical plans ○ CTAS, RTAS, Append, OverwriteByExpression, etc. ○ Document user expectations and behavior ○ Implement consistent behavior in Spark for all v2 sources ● SPIP: Standardize SQL logical plans https://issues.apache.org/jira/browse/SPARK-23521
22 . Standard Logical Plans ● Specialize physical plans, not logical plans ○ No more InsertIntoDataSourceTable and InsertIntoHiveTable ○ No forgetting to apply rules to a new logical plan ● Apply validation rules universally ○ Same rules for Append and Overwrite ● Avoid using RunnableCommand
23 . Consistent behavior ● Create, alter, and drop tables in Spark, not sources ○ CTAS when table exists: fail the query in Spark ○ Requires a catalog plugin API ● SPIP: Spark API for Table Metadata https://issues.apache.org/jira/browse/SPARK-27067
24 . Catalog API ● Multi-catalog support ○ Create tables in the source of truth ○ Avoiding this caused strange Spark behavior ● SPIP: Identifiers for multi-catalog support https://issues.apache.org/jira/browse/SPARK-27066
25 . Status ● Goal: working DSv2 in Spark 3.0 ○ Independent of the v1 path ○ Default behavior to v1 ● SPIPs have been adopted by community votes ● Append and overwrite plans are added and working ● Waiting on catalog API to add CTAS and DDL
26 .Thank you! Questions? Up next: Migrating to Spark at Netflix At 11:50 today, in Room 2006