- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Designing ETL Pipelines with Structured Streaming and Delta Lake
展开查看详情
1 .DESIGNING ETL PIPELINES WITH STRUCTURED STREAMING How to architect things right Wenchen Fan
2 .About Me Tech Lead of open source team @ Apache Spark PMC member & committer Focus on Spark Core and SQL
3 . Structured Streaming Distributed stream processing built on SQL engine High throughput, second-scale latencies Fault-tolerant, exactly-once Great set of connectors Philosophy: Treat data streams like unbounded tables Users write batch-like queries on tables Spark will continuously execute the queries incrementally on streams 3
4 . Structured Streaming Example spark.readStream.format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") Read JSON data from Kafka .load() Parse nested JSON json") .selectExpr("cast (value as string) as Store in structured Parquet table .select(from_json("json", schema).as("data")) Get end-to-end failure guarantees .writeStream .format("parquet") .option("path", "/parquetTable/" .trigger("1 minute") ETL .option("checkpointLocation", "…") .start() 4
5 .Anatomy of a Streaming Query spark.readStream.format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") Specify where to read data from .load() .selectExpr("cast (value as string) as json") Specify data transformations .select(from_json("json", schema).as("data")) .writeStream Specify where to write data to .format("parquet") .option("path", "/parquetTable/" .trigger("1 minute") Specify how to process data .option("checkpointLocation", "…") .start() 5
6 . Spark automatically streamifies! t=1 t=2 t=3 Read from spark.readStream.format("kafka") Kafka Source .option("kafka.boostrap.servers",...) Kafka .option("subscribe", "topic") .load() .selectExpr("cast (value as string) as json") Project .select(from_json("json", schema).as("data")) cast(value) as json Optimized new data Operator new data new data .writeStream process process process .format("parquet") codegen, off- .option("path", "/parquetTable/" .trigger("1 minute") Project heap, etc. .option("checkpointLocation", "…") from_json(json) .start() Write to Parquet Parquet Sink DataFrames, Logical Optimized Series of Incremental Datasets, SQL Plan Plan Execution Plans Spark SQL converts batch-like query to a series of incremental execution plans operating on new batches of data
7 .Open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. • ACID transactions • Open formats • Schema Enforcement and Evolution • Scalable Metadata Handling • Data versioning and Audit History • Great with batch + streaming • Time travel to old versions • Upserts and Deletes https://delta.io/
8 .Open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. THE GOOD OF DATA WAREHOUSES THE GOOD OF DATA LAKES • Pristine Data • Massive scale out • Transactional Reliability • Open Formats • Fast SQL Queries • Mixed workloads https://delta.io/
9 . STRUCTURED STREAMING How to build streaming data pipelines with them?
10 . STRUCTURED STREAMING What are the design patterns to correctly architect streaming data pipelines?
11 .Another streaming design pattern talk???? Most talks This talk Focus on a pure Spark is more than a streaming engine streaming engine Explain one way of Spark has multiple ways of achieving the end goal achieving the end goal with tunable perf, cost and quality 11
12 .This talk How to think about design Common design patterns How we are making this easier 12
13 .Streaming Pipeline Design ???? Data streams Insights 13
14 . ???? What? Why? How? 14
15 .What? What is your input? What is your output? What is your data? What results do you need? What format and system is What throughput and your data in? latency do you need? 15
16 .Why? humans? computers? Why do you want this output in this way? Who is going to take actions based on it? When and how are they going to consume it? 16
17 .Why? Common mistakes! #1 "I want my dashboard with counts to No point of updating every be updated every second" second if humans are going to take actions in minutes or hours #2 "I want to generate automatic alerts No point taking fast actions on with up-to-the-last second counts" low quality data and results (but my input data is often delayed) 17
18 .Why? Common mistakes! #3 "I want to train machine learning Key-value stores are not great models on the results" for large, repeated data scans (but my results are in a key-value store) which machine learning workloads perform 18
19 .How? How to process How to store the data? the results? ???? ???? 19
20 .Streaming Design Patterns What? Complex ETL How? Why? 20
21 .Pattern 1: ETL What? Input: unstructured input stream Output: structured from files, Kafka, etc. tabular data 01:06:45 WARN id = 1 , update failed 01:06:45 INFO id=23, update success 01:06:57 INFO id=87: update postpo … Why? Query latest structured data interactively or with periodic jobs 21
22 .P1: ETL What? How? Convert unstructured input to Process: Use Structured Streaming query to transform structured tabular data unstructured, dirty data Latency: few minutes Run 24/7 on a cluster with default trigger Store: Save to structured scalable storage that supports data skipping, etc. Why? E.g.: Parquet, ORC, or even better, Delta Lake Query latest structured data ETL QUERY interactively or with periodic jobs 01:06:45 WARN id = 1 , update failed STRUCTURED 01:06:45 INFO id=23, update success 01:06:57 INFO id=87: update postpo … STREAMING 22
23 .P1: ETL with Delta Lake How? Read with snapshot guarantees while writes are in progress Concurrently reprocess data with full ACID guarantees Store: Save to Coalesce small files into larger files Update table to fix mistakes in data Delete data for GDPR REPROCESS ETL QUERY 01:06:45 WARN id = 1 , update failed STRUCTURED 01:06:45 INFO id=23, update success 01:06:57 INFO id=87: update postpo … STREAMING 23
24 .P1.1: Cheaper ETL What? How? Convert unstructured input to Process: Still use Structured Streaming query! structured tabular data Run streaming query with "trigger.once" for Latency: few minutes hours processing all available data since last batch Not have clusters up 24/7 Set up external schedule (every few hours?) to periodically start a cluster and run one batch Why? RESTART ON Query latest data interactively SCHEDULE or with periodic jobs 01:06:45 WARN id = 1 , update failed STRUCTURED 01:06:45 INFO id=23, update success STREAMING Cheaper solution 01:06:57 INFO id=87: update postpo … 24
25 .P1.2: Query faster than ETL! What? How? Latency: hours seconds Query data in Kafka directly using Spark SQL Can process up to the last records received by Kafka when the query was started Why? Query latest up-to-the last SQL second data interactively 25
26 .Pattern 2: Key-value output What? Input: new data Output: updated for each key values for each key KEY LATEST VALUE { "key1": "value1" } Aggregations (sum, count, …) { "key1": "value2" } key1 value2 { "key2": "value3" } key2 value3 Sessionizations Why? Lookup latest value for key (dashboards, websites, etc.) OR Summary tables for querying interactively or with periodic jobs 26
27 .P2.1: Key-value output for lookup What? How? Generate updated values for keys Process: Use Structured Streaming with Latency: seconds/minutes stateful operations for aggregation Store: Save in key-values stores optimized for single key lookups Why? Lookup latest value for key STATEFUL AGGREGATION LOOKUP { "key1": "value1" } { "key1": "value2" } STRUCTURED { "key2": "value3" } STREAMING 27
28 .P2.2: Key-value output for analytics What? How? Generate updated values for keys Process: Use Structured Streaming with Latency: seconds/minutes stateful operations for aggregation Store: Save in Delta Lake! Delta Lake supports upserts using MERGE Why? Lookup latest value for key STATEFUL AGGREGATION SUMMARY Summary tables for analytics { "key1": "value1" } { "key1": "value2" } STRUCTURED { "key2": "value3" } STREAMING 28
29 .P2.2: Key-value output for analytics How? streamingDataFrame.foreachBatch { batchOutputDF => Stateful operations for aggregation DeltaTable.forPath(spark, "/aggs/").as("t") .merge( Delta Lake supports upserts using batchOutputDF.as("s"), Merge SQL operation "t.key = s.key") Scala/Java/Python APIs with .whenMatched().update(...) same semantics as SQL Merge .whenNotMatched().insert(...) STATEFUL AGGREGATION SUMMARY SQL Merge supported in .execute() Databricks { "key1": "value1" } { "key1": "value2" } Delta, not in OSS yet STRUCTURED }.start() { "key2": "value3" } STREAMING 29