- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Modular Apache Spark: Transform Your Code in Pieces
1 .Modular Apache Spark: Transform Your Code into Pieces Albert Franzi (@FranziCros), Alpha Health
2 .Slides available in: http://bit.ly/SparkAI2019-afranzi 2
3 . About me 2013 2014 2015 2017 2018 2019 3
4 . Modular Apache Spark: Transform Your Code into Pieces 4
5 .We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 5
6 .We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 6
7 .Did you play with duplicated code across your Spark Jobs? Have you ever experienced the joy of code reviewing a never ending stream of spark chaos? @ The Shining 1980 7
8 .Please! Don’t play with duplicated code never ever! @ The Shining 1980 8
9 .Fragment the Spark Job Spark Readers Transformation Task Context Spark Writers Aliases Joins Formatters ... 9
10 .Readers / Writers ● Enforce schemas ● Use schemas to read only the fields you are going to use Spark Readers ● Provide Readers per Dataset & attach its sources to it ● Share schemas & sources between Readers & Writers ● GDPR compliant by design Spark Writers 10
11 . Readers val userBehaviourSchema: StructType = ??? val userBehaviourPath = Path("s3://<bucket>/user_behaviour/year=2018/month=10/day=03/hour=12/gen=27/") GDPR val userBehaviourReader = ReaderBuilder(PARQUET) .withSchema(userBehaviourSchema) .withPath(userBehaviourPath) Spark Readers .buildReader() val df: DataFrame = userBehaviourReader.read() 11
12 . Readers val userBehaviourSchema: StructType = ??? // Path structure - s3://<bucket>/user_behaviour/[year]/[month]/[day]/[hour]/[gen]/ val userBehaviourBasePath = Path("s3://<bucket>/user_behaviour/") val startDate: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC) val halfDay: Duration = Duration.ofHours(12) val userBehaviourPaths: Seq[Path] = PathBuilder .latestGenHourlyPaths(userBehaviourBasePath, startDate, halfDay) Spark Readers val userBehaviourReader = ReaderBuilder(PARQUET) .withSchema(userBehaviourSchema) .withPath(userBehaviourPaths: _*) .buildReader() val df: DataFrame = userBehaviourReader.read() 12
13 . Readers val userBehaviourSchema: StructType = ??? val userBehaviourBasePath = Path("s3://<bucket>/user_behaviour/") val startDate: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC) val halfDay: Duration = Duration.ofHours(12) val userBehaviourReader = ReaderBuilder(PARQUET) Spark Readers .withSchema(userBehaviourSchema) .withHourlyPathBuilder(userBehaviourBasePath, startDate, halfDay) .buildReader() val df: DataFrame = userBehaviourReader.read() 13
14 . Readers val df: DataFrame = UserBehaviourReader.read(startDate, halfDay) Spark Readers 14
15 . Transforms def transform[U](t: (Dataset[T]) ⇒ Dataset[U]): Dataset[U] Transformation Dataset[T] ⇒ magic ⇒ Dataset[U] 15
16 . Transforms def withGreeting(df: DataFrame): DataFrame = { df.withColumn("greeting", lit("hello world")) } def extractFromJson(colName: String, outputColName: String, Transformation jsonSchema: StructType)(df: DataFrame): DataFrame = { df.withColumn(outputColName, from_json(col(colName), jsonSchema)) } 16
17 . Transforms def onlyClassifiedAds(df: DataFrame): DataFrame = { df.filter(col("event_type") === "View") .filter(col("object_type") === "ClassifiedAd") } def dropDuplicates(df: DataFrame): DataFrame = { df.dropDuplicates() } def cleanedCity(df: DataFrame): DataFrame = { df.withColumn("city", getCityUdf(col("object.location.address"))) } Transformation val cleanupTransformations: Seq[DataFrame => DataFrame] = Seq( dropDuplicates, cleanedCity, onlyClassifiedAds ) val df: DataFrame = UserBehaviourReader.read(startDate, halfDay) val classifiedAdsDF = df.transforms(cleanupTransformations: _*) 17
18 . Transforms val cleanupTransformations: Seq[DataFrame => DataFrame] = Seq( dropDuplicates, cleanedCity, onlyClassifiedAds ) val df: DataFrame = UserBehaviourReader.read(startDate, halfDay) val classifiedAdsDF = df.transforms(cleanupTransformations: _*) Transformation “As a data consumer, I only need to pick up which transformations I would like to apply, instead of coding them from scratch.” “It’s like cooking, engineers provide manufactured ingredients (transformations) and Data Scientists use the required ones for a successful receipt.” 18
19 . Transforms - Links of Interest github.com/MrPowers/spark-daria “Spark helper methods to maximize developer productivity.” “DataFrame transformations can be defined with arguments so they don’t make assumptions about the schema of the underlying DataFrame.” - by Matthew Powers. Transformation bit.ly/Spark-ChainingTransformations bit.ly/Spark-SchemaIndependentTransformations 19
20 .We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 20
21 .Did you put untested Spark jobs into production? “Mars Climate Orbiter destroyed because of a Metric System Mixup (1999)” 21
22 .Testing with Holden Karau github.com/holdenk/spark-testing-base “Base classes to use when writing tests with Spark.” ● Share Spark Context between tests ● Provide methods to make tests easier ○ Fixture Readers ○ Json to DF converters ○ Extra validators github.com/MrPowers/spark-fast-tests “An alternative to spark-testing-base to run tests in parallel without restarting Spark Session after each test file.” 22
23 .Testing SharedSparkContext package com.holdenkarau.spark.testing import java.util.Date import org.apache.spark._ import org.scalatest.{BeforeAndAfterAll, Suite} /** * Shares a local `SparkContext` between all tests in a suite * and closes it at the end. You can share between suites by enabling * reuseContextIfPossible. */ trait SharedSparkContext extends BeforeAndAfterAll with SparkContextProvider { self: Suite => ... protected implicit def reuseContextIfPossible: Boolean = false ... } 23
24 .Testing package com.alpha.data.test trait SparkSuite extends DataFrameSuiteBase { self: Suite => override def reuseContextIfPossible: Boolean = true protected def createDF(data: Seq[Row], schema: StructType): DataFrame = { spark.createDataFrame(spark.sparkContext.parallelize(data), schema) } protected def jsonFixtureToDF(fileName: String, schema: Option[StructType] = None): DataFrame = { val fixtureContent = readFixtureContent(fileName) val fixtureJson = fixtureContentToJson(fixtureContent) jsonToDF(fixtureJson, schema) } protected def checkSchemas(inputSchema: StructType, expectedSchema: StructType): Unit = { assert(inputSchema.fields.sortBy(_.name).deep == expectedSchema.fields.sortBy(_.name).deep) } ... } 24
25 .Testing Spark Readers Transformation Task Context Spark Writers Test Testing each piece independently helps testing all together. 25
26 .We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 26
27 .Are tests taking too long to execute? 27
28 .Junit4Git by Raquel Pau github.com/rpau/junit4git “Junit Extensions for Test Impact Analysis.” “This is a JUnit extension that ignores those tests that are not related with your last changes in your Git repository.” 28
29 .Junit4Git - Gradle conf configurations { agent } @RunWith(classOf[ScalaGitRunner]) dependencies { testCompile("org.walkmod:scalatest4git_2.11:${version}") agent "org.walkmod:junit4git-agent:${version}" } test.doFirst { jvmArgs "-javaagent:${configurations.agent.singleFile}" } 29