- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Productizing Structured Streaming Jobs
展开查看详情
1 .Productizing Structured Streaming Jobs Burak Yavuz April 24, 2019 – SAIS 2019 San Francisco
2 .Who am I ● Software Engineer – Databricks - “We make your streams come true” ● Apache Spark Committer ● MS in Management Science & Engineering - Stanford University ● BS in Mechanical Engineering - Bogazici University, Istanbul
3 .Writing code is fun… … is that all we do?
4 .Image from: https://www.smartsheet.com/sites/default/files/IC-Software-Development-Life-Cycle.jpg
5 .Let’s look at the operational aspects of data pipelines
6 .Agenda How to • Test • Monitor • Deploy • Update Structured Streaming Jobs
7 . Structured Streaming stream processing on Spark SQL engine fast, scalable, fault-tolerant rich, unified, high level APIs deal with complex data and complex workloads rich ecosystem of data sources integrate with many storage systems
8 .Structured Streaming @ 1000s of customer streaming apps in production on Databricks 1000+ trillions of rows processed in production
9 .Streaming word count
10 .Anatomy of a Streaming Query spark.readStream .format("kafka") Source .option("subscribe", "input") .load() • Specify one or more locations .groupBy($"value".cast("string")) to read data from .count() .writeStream • Built in support for .format("kafka") .option("topic", "output") Files/Kafka/Socket, .trigger("1 minute") pluggable. .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") • Can include multiple sources .start() of different types using union()
11 .Anatomy of a Streaming Query spark.readStream .format("kafka") Transformation .option("subscribe", "input") .load() • Using DataFrames, .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) Datasets and/or SQL. .writeStream .format("kafka") • Catalyst figures out how to .option("topic", "output") execute the transformation .trigger("1 minute") .outputMode(OutputMode.Complete()) incrementally. .option("checkpointLocation", "…") .start() • Internal processing always exactly-once.
12 .Anatomy of a Streaming Query spark.readStream .format("kafka") Sink .option("subscribe", "input") .load() • Accepts the output of each .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) batch. .writeStream .format("kafka") • When supported sinks are .option("topic", "output") transactional and exactly .trigger("1 minute") .outputMode(OutputMode.Complete()) once (Files). .option("checkpointLocation", "…") .start() • Use foreach to execute arbitrary code.
13 .Anatomy of a Streaming Query spark.readStream .format("kafka") Output mode – What's output .option("subscribe", "input") .load() • Complete – Output the whole answer .groupBy('value.cast("string") as 'key) every time .agg(count("*") as 'value) • Update – Output changed rows .writeStream .format("kafka") • Append – Output new rows only .option("topic", "output") .trigger("1 minute") Trigger – When to output .outputMode("update") .option("checkpointLocation", "…") • Specified as a time, eventually .start() supports data size • No trigger means as fast as possible
14 .Anatomy of a Streaming Query spark.readStream .format("kafka") Checkpoint .option("subscribe", "input") .load() • Tracks the progress of a .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) query in persistent storage .writeStream .format("kafka") • Can be used to restart the .option("topic", "output") query if there is a failure. .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .start()
15 .Reference Architecture
16 .Data Pipelines @ Databricks Streaming Analytics Reporting Event Based Bronze Tables Silver Tables Gold Tables
17 .Event Based File Sources • Launched Structured Streaming connectors: AWS SQS • s3-sqs on AWS (DBR 3.5) AWS S3 • abs-aqs on Azure (DBR 5.0) • As blobs are generated: • Events are published to SQS/AQS • Spark reads these events • Then reads original files from blob storage system Azure Blob Storage Event Grid Queue Storage
18 .Properties of Bronze/Silver/Gold • Bronze tables • No data processing • Deduplication + JSON => Parquet conversion • Data kept around for a couple weeks in order to fix mistakes just in case • Silver tables • Tens/Hundreds of tables • Directly queryable tables • PII masking/redaction • Gold tables • Materialized views of silver tables • Curated tables by the Data Science team
19 .Why this Architecture? • Maximize Flexibility • Maximize Scalability • Lower Costs
20 . See TD’s talk: “Designing Structured Streaming Pipelines—How to Architect Things Right” April 25 2:40pm – Streaming Track
21 .Testing
22 .Testing spark.readStream - How do we test this .format("kafka") code? .option("subscribe", "input") .load() - Do we need to set up .groupBy('value.cast("string") as 'key) Kafka? .agg(count("*") as 'value) .writeStream - How do we verify .format("kafka") result correctness? .option("topic", "output") .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .start()
23 .Testing Strategy 1: Don’t care about sources and sinks. Just test your business logic, using batch DataFrames .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) Pros: Cons: - Easy to do in - Not all batch operations Scala/Python are supported in Streaming
24 .Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark val inputData = MemoryStream[Array[Byte]] val stream = inputData.toDS().toDF("value") .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), CheckAnswer(("a" -> 1), ("b" -> 1)) )
25 .Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark val inputData = MemoryStream[Array[Byte]] Source is in val stream = inputData.toDS().toDF("value") memory .groupBy('value.cast("string") as 'key) Schema can be set .agg(count("*") as 'value) arbitrarily to mimic real source testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), CheckAnswer(("a" -> 1), ("b" -> 1)) )
26 .Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark val inputData = MemoryStream[Array[Byte]] val stream = inputData.toDS().toDF("value") .groupBy('value.cast("string") as 'key) Transformation .agg(count("*") as 'value) unchanged. testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), CheckAnswer(("a" -> 1), ("b" -> 1)) )
27 .Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark Starts a stream outputting testStream(stream, OutputMode.Update)( data to a memory sink AddData(inputData, ...), CheckAnswer(("a" -> 1), ("b" -> 1)) )
28 .Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark testStream(stream, OutputMode.Update)( Add data to AddData(inputData, "a".getBytes(), "b".getBytes()), the CheckAnswer(("a" -> 1), ("b" -> 1)) source )
29 .Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), Process all data and CheckAnswer(("a" -> 1), ("b" -> 1)) check result )