- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Automating Predictive Modeling at Zynga with PySpark and Pandas UDFs
展开查看详情
1 .WIFI SSID:SparkAISummit | Password: UnifiedAnalytics
2 .Automating Predictive Modeling at Zynga with Pandas UDFs Ben Weber, Zynga #UnifiedAnalytics #SparkAISummit
3 .Zynga Analytics #UnifiedAnalytics #SparkAISummit 3
4 .Zynga Portfolio #UnifiedAnalytics #SparkAISummit 4
5 .Our Challenge • We want to build game-specific models for behaviors such as likelihood to purchase • Our games have diverse event taxonomies • We have tens of millions of players and dozens of games across multiple platforms #UnifiedAnalytics #SparkAISummit 5
6 .Our Approach • Featuretools for automating feature engineering • Pandas UDFs for distributing Featuretools • Databricks for building our model pipeline #UnifiedAnalytics #SparkAISummit 6
7 .AutoModel • Zynga’s first portfolio-scale data product • Generates hundreds of propensity models • Powers features in our games & live services #UnifiedAnalytics #SparkAISummit 7
8 .AutoModel Pipeline Data Feature Feature Model Model Extract Engineering Application Training Publish #UnifiedAnalytics #SparkAISummit 8
9 . Data Extraction Data Feature Feature Model Model Extract Engineering Application Training Publish S3 & Parquet #UnifiedAnalytics #SparkAISummit 9
10 .Feature Engineering Data Feature Feature Model Model Extract Engineering Application Training Publish #UnifiedAnalytics #SparkAISummit 10
11 .Automated Feature Engineering • Goals – Translate our narrow and deep data tables into a shallow and wide representation – Support dozens of titles with diverse event taxonomies – Scale to billions of records and millions of players – Minimize manual data science workflows #UnifiedAnalytics #SparkAISummit 11
12 .Feature Tools • A python library for deep feature synthesis • Represents data as entity sets • Identifies feature descriptors for transforming your data into new representations #UnifiedAnalytics #SparkAISummit 12
13 .Entity Sets • Define the relationships between tables • Work with Pandas data frames Entityset: transactions Entities: customers (shape = [5, 3]) transactions (shape = [500, 5]) Relationships: transactions.customer_id -> customers.customer_id #UnifiedAnalytics #SparkAISummit 13
14 . Feature Synthesis import featuretools as ft feature_matrix, features_defs = ft.dfs(entityset=es, target_entity="customers") feature_matrix.head(5) customer_id zip_code count(transactions) sum(transactions.amounts) 1 91000 0 0 2 91000 10 120.5 3 91005 5 17.96 4 91005 2 9.99 5 91000 3 29.97 #UnifiedAnalytics #SparkAISummit 14
15 .Using Featuretools import featuretools as ft # 1-hot encode the raw event data es = ft.EntitySet(id="events") es = es.entity_from_dataframe(entity_id="events", dataframe=rawDataDF) feature_matrix, defs = ft.dfs(entityset=es, target_entity="events", max_depth=1) encodedDF, encoders = ft.encode_features(feature_matrix, defs) # perform deep feature synthesis on the encoded data es = ft.EntitySet(id="events") es = es.entity_from_dataframe(entity_id="events", dataframe=encodedDF) es = es.normalize_entity(base_entity_id="events", new_entity_id="users", index="user_id") generated_features, descriptors = ft.dfs(entityset=es, target_entity="users", max_depth=3) #UnifiedAnalytics #SparkAISummit 15
16 .Scaling Up • Parallelize the process • Translate feature descriptions to Spark SQL • Find a way to distribute the task #UnifiedAnalytics #SparkAISummit 16
17 .Feature Application Data Feature Feature Model Model Extract Engineering Application Training Publish Pandas UDFs #UnifiedAnalytics #SparkAISummit 17
18 .Pandas UDFs • Introduced in Spark 2.3 • Provide Scalar and Grouped map operations • Partitioned using a groupby clause • Enable distributing code that uses Pandas #UnifiedAnalytics #SparkAISummit 18
19 .Grouped Map UDFs Spark Input Pandas Pandas Pandas Pandas Pandas Input Input Input Input Input UDF UDF UDF UDF UDF Pandas Pandas Pandas Pandas Pandas Output Output Output Output Output Spark Output #UnifiedAnalytics #SparkAISummit 19
20 .When to use UDFs? • You need to operate on Pandas data frames • Your data can be represented as a single Spark data frame • You can partition your data set #UnifiedAnalytics #SparkAISummit 20
21 .Distributing SciPy schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 21
22 .Step 1: Define the schema schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 22
23 .Step 2: Choose a partition schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 23
24 .Step 3: Use Pandas schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 24
25 .Step 4: Return Pandas schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 25
26 .Distributing Featuretools @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def apply_feature_generation(pandasInputDF): # create Entity Set representation es = ft.EntitySet(id="events") es = es.entity_from_dataframe(entity_id="events", dataframe=pandasInputDF) es = es.normalize_entity(base_entity_id="events", new_entity_id="users", index="user_id") # apply the feature calculation and return the result return ft.calculate_feature_matrix(saved_features, es) sparkFeatureDF = sparkInputDF.groupby('user_group').apply(apply_feature_generation) #UnifiedAnalytics #SparkAISummit 26
27 .Issues with Pandas UDFs • Debugging is a challenge • Pushes the limits of Apache Arrow • Data type mismatches • Schema needs to be known before execution #UnifiedAnalytics #SparkAISummit 27
28 .Model Training & Scoring Data Feature Feature Model Model Extract Engineering Application Training Publish MLlib #UnifiedAnalytics #SparkAISummit 28
29 .Propensity Models • Classification models – Gradient-Boosted Trees – XGBoost • Hyperparameter tuning – ParamGridBuilder – CrossValidator #UnifiedAnalytics #SparkAISummit 29