- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- <iframe src="https://www.slidestalk.com/ApachePulsar/4PulsarSpark27059?embed" frame border="0" width="640" height="360" scrolling="no" allowfullscreen="true">复制
- 微信扫一扫分享
基于 Pulsar 和 Spark 打造批流融合的数据存储和分析平台
申毅杰 | Apache Pulsar/Spark Contributor、开源爱好者
摘要:全面了解基于 Apache Pulsar 和 Apache Spark 打造批流融合的数据存储和分析平台,包括使用 Reader、Consumer、Segment 接口高效读取 Pulsar 中的流数据;使用基于 Topic、PartitionedTopic 将处理结果持久化到 Pulsar 中;使用 Pulsar Schema 实现便捷的数据查询与分析。
展开查看详情
1 .A Unified Platform for Real-time Storage and Processing Apache Pulsar as Stream Storage Apache Spark for Processing as an Example Yijie Shen yjshen 2019062 9
2 .Outline Motivation & Challenges Why Pulsar Spark-Pulsar Connector
3 .Motivation Ubiquity of real-time data Sensors, logs from mobile app, IoT Organizations got better at capturing data Data matters Batch and interactive analysis , stream processing, machine learning, graph processing The involvement of analytic platforms Unified / similar API for batch/declarative and stream processing E.g. Spark, Flink
4 .Challenges Compatibility with cloud infrastructure Multi-tenant management Scalability Data movement during its lifecycle Visibility of data Operational cost and problems Multiple systems to maintain Resource allocation and provisioning Message Queue Cold Storage
5 .Pulsar – A cloud-native architecture Stateless Serving Durable Storage
6 .Pulsar – Segment-based Storage Managed ledger The storage layer for a single topic Ledger Single writer, append-only Replicated to multiple bookies
7 .Pulsar – Infinite Stream Storage Reduce storage cost offloading segment to tiered storage one-by-one
8 .Pulsar Schema Consensus of data at server-side Built-in schema registry Data schema on a per-topic basis Send and receive typed message directly Validation Multi-version
9 .Outline Motivation & Challenges Why Pulsar Spark-Pulsar Connector API Internals
10 .Spark Pulsar Connector – API Read val df = spark .read .format("pulsar") .option(" service.url ", "pulsar://...") .option(" admin.url ", "http://...") .option("topic", "topic1") .load() Write df .write .format("pulsar") .option(" service.url ", "pulsar://...") .option(" admin.url ", "http://...") .option("topic", "topic2") .save() Deploying ./bin/spark-submit --packages org.apache.pulsar.segment:psegment-connectors-spark-all _{{SCALA_BINARY_VERSION}}:{{PSEGMENT_VERSION}} ... Stream mode readStream writeStream start()
11 .Two levels of Reading API Consumer Subscribe / seek / receive Per topic partition Segment Read directly from Bookies For parallelism
12 .Spark Structured Streaming Overview Input and Output Input sources must be replayable Sinks must support idempotent writes for exactly-once semantic API that are streaming specifically Triggers how often the engine will attempt to compute a new result and update the output sink event time as watermark policy to determine when enough data has been received
13 .SS Source and Sink API trait Source { def schema: StructType def getOffset : Option[Offset] def getBatch (start: Option[Offset], end: Offset): DataFrame def commit(end: Offset): Unit def stop(): Unit } trait Sink { def addBatch ( batchId : Long, data: DataFrame ): Unit }
14 .Anatomy of StreamExecution availableOffsets offsetLog (WAL) getOffset () Logical Plan getBatch () IncrementalExecution addBatch () commit batchCommitLog Source StreamExecution Sink commit()
15 .Topic/Partition add/delete discovery Happens during logical planning getBatch (start: Option[Offset], end: Offset) Discovery topic differences between start and end Start – last end End – getOffset () Connector provide available offset for all topic/partitions for each getOffset Create DataFrame / DataSet based on existing topic/partitions SS take care of the rest Offset { topicOffsets : Map[String, MessageId ] }
16 .A Little More On Schema Regard Pulsar as structured data storage Only fetched once at the very beginning of query planning All topics for a DataFrame / DataSet must share same schema Fetched using Pulsar Admin API
17 .Thanks! Q&A