- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Ge aviation spark application experience porting analytics
展开查看详情
1 .Experience Porting Analytics into PySpark ML Pipelines 4 Oct 2018 Prof Honor Powrie Dr Peter Knight GE Aviation Digital Session hashtag: #SAISExp12
2 .Outline • GE Aviation - commercial engines, data and analytics overview • Historic analytic development process • Converting analytics to PySpark ML Pipelines • Plotting ML Pipelines • The full analytic lifecycle in Spark • Conclusions GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 2
3 . General Electric - Aviation • 40k employees • $27.4B revenue - 2017 • >33k commercial engines “Every two seconds, an aircraft powered by GE technology takes off somewhere in the world” GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 3
4 .HISTORICAL TODAY TOMORROW 1 KB / FLIGHT 200 KB / FLIGHT 1 GB / FLIGHT 30 PARAMETERS 1000 PARAMETERS 1000 PARAMETERS @ 10 Hz 3 SNAPSHOTS / 20 SNAPSHOTS / 3.5 HR / FLIGHT FLIGHT FLIGHT < 50 GB per year 100K flights per day …100K flights per day 10 TB per year …….100 TB per day …50 PB per year
5 .ML and Analytic Applications - GE Commercial Engines Operational Lifecycle Shop Visit Forecast Enterprise Workscope Material Prediction Supply Chain Management Time on Wing Optimisation 3 2 1 Fleet Monitoring Fleet Management Borescope Imaging Digital Twin Models GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 5
6 .About Functional Elements Examples • GE Aviation’s custom ML library, • Fleet based on Probabilistic Graphical Segmentation Models • Developed to tackle some of the • Multivariate key challenges of real world data Models and • Used extensively in ML applications Anomaly Detection for GE commercial engines • Being recoded in C++ and Python • Diagnostic and integrated with Spark Reasoning GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 6
7 .Historic Analytic Development and Deployment Process Develop (Data Scientists) Deploy (Software Engineers) Hand off package Analytic recoded in Java Data: Greenplum to SQL server Functional spec XML configuration Windows Server environment Oracle Database Model files MATLAB data exploration Predix run-time Configuration file Test in QA environment MATLAB analytics Test cases Deploy to pre-production MATLAB generate metrics Deploy to production Toll Gate Reviews Monitor in production (Spotfire) Aim: convert entire model building & deployment workflow to ML Pipelines GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 7
8 .Why we like Spark ML Pipelines • Easy to string together pre-processing, ML and alerting stages • Same pipeline – and code - for analytic development (model building), evaluation and deployment (run time) • Extensive library of existing analytics and data manipulation tools • Extensible for our own analytics using Python – in a standard framework • Self describing – explainParams()shows you how to use it • Scales to big data GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 8
9 .Converting our Workflow to Spark ML Pipelines • The pipeline includes various custom modules before & after ML, e.g. normalisation & alerting. • Some analytics are more complex than just processing on a row by row basis • e.g. Median Trend: group the data by engine (and configurable other conditions) and have a sliding window (by date or records) that is sorted by date. • For our ML algorithms we have overcome a number of challenges… GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 9
10 .Converting ProDAPS to Spark ML Our ProDAPS ML analytic seems more complex than many already in Spark. For example: • Once the model is built, many different types of inference can be configured • Most existing ML analytics use Dense Vectors, but these have limitations e.g. they can’t handle null records • We want to be able to filter the data between pipeline stages • We are still on Spark 2.2, so had to write our own method for saving and loading custom stages • No export/import commands for porting pipelines between Spark clusters • Tedious to add params – see next slide • Our full wish-list is on JIRA at: SPARK-19498 GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 10
11 . Example Custom Transformer Showing Verbose Param Code Straight line: y = m x + c Current code required (~50 lines) from pyspark import keyword_only from pyspark.ml.param.shared import Param, Params, TypeConverters from pyspark import keyword_only from pyspark.ml import Transformer from pyspark import keyword_only from pyspark.ml.param.shared import Param, Params, TypeConverters from pyspark.ml import• Transformer Currently for Params, each parameter that the default values are class StraightLine(Transformer): from pyspark.ml.param.shared import Param, TypeConverters @keyword_only def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): from pyspark.ml import Transformer super(StraightLine, self).__init__() self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) being set 3 times and the parameter names are being class StraightLine(Transformer): kwargs = self._input_kwargs class StraightLine(Transformer): self.setParams(**kwargs) @keyword_only @keyword_only @keyword_only entered 9 times! def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): kwargs = self._input_kwargs def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): return self._set(**kwargs) super(StraightLine, self).__init__() super(StraightLine, self).__init__() self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)", typeConverter=TypeConverters.toString) kwargs = self._input_kwargs def setInputCol(self, value): kwargs = self._input_kwargs return self._set(inputCol=value) self.setParams(**kwargs) def getInputCol(self): self.setParams(**kwargs) return self.getOrDefault(self.inputCol) @keyword_only @keyword_only def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (string)", def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): typeConverter=TypeConverters.toString) kwargs = self._input_kwargs def setOutputCol(self, value): kwargs = self._input_kwargs return self._set(outputCol=value) return self._set(**kwargs) def getOutputCol(self): return self.getOrDefault(self.outputCol) return self._set(**kwargs) m = Param(Params._dummy(), "m", "the slope of the line. (float)", inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)" typeConverter=TypeConverters.toFloat) inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)", typeConverter=TypeConverters.toString) def setM(self, value): typeConverter=TypeConverters.toString) return self._set(m=value) def setInputCol(self, value): def getM(self): def setInputCol(self, value): return self.getOrDefault(self.m) return self._set(inputCol=value) return self._set(inputCol=value) c = Param(Params._dummy(), "c", "the y offset when x = 0. (float)", def getInputCol(self): typeConverter=TypeConverters.toFloat) def getInputCol(self): def setC(self, value): return self.getOrDefault(self.inputCol) return self._set(c=value) return self.getOrDefault(self.inputCol) def getC(self): return self.getOrDefault(self.c) outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (strin outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (string)", def _transform(self, dataset): typeConverter=TypeConverters.toString) typeConverter=TypeConverters.toString) input_col = self.getInputCol() def setOutputCol(self, value): if not input_col: def setOutputCol(self, value): return self._set(outputCol=value) raise Exception("inputCol not supplied") return self._set(outputCol=value) def getOutputCol(self): output_col = self.getOutputCol() def getOutputCol(self): if not output_col: return self.getOrDefault(self.outputCol) GE Aviation - porting analytics to PySpark ML Pipelines | 4return raise Exception("outputCol not supplied") Oct 2018 self.getOrDefault(self.outputCol) 11 return dataset.selectExpr("*", m = Param(Params._dummy(), "m", "the slope of the line. (float)", str(self.getM()) + " * " + input_col + " + " + str(self.getC()) + " AS " + output_col)
12 . Example Custom Transformer Showing Verbose Param Code Straight line: y = m x + c Current code required (~50 lines) from pyspark import keyword_only from pyspark.ml.param.shared import Param, Params, TypeConverters from pyspark.ml import Transformer class StraightLine(Transformer): @keyword_only • Currently for each parameter that the default values are def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): super(StraightLine, self).__init__() self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) being set 3 times and the parameter names are being kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only entered 9 times! def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): kwargs = self._input_kwargs return self._set(**kwargs) • We propose adding a method to add all this boiler plate inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)", typeConverter=TypeConverters.toString) def setInputCol(self, value): code in one function – specify param name, description, return self._set(inputCol=value) def getInputCol(self): return self.getOrDefault(self.inputCol) datatype, default value and required flag outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (string)", typeConverter=TypeConverters.toString) • Ideally explainParams() should also show the data def setOutputCol(self, value): return self._set(outputCol=value) def getOutputCol(self): types return self.getOrDefault(self.outputCol) m = Param(Params._dummy(), "m", "the slope of the line. (float)", typeConverter=TypeConverters.toFloat) def setM(self, value): Proposed code (~10 lines, clearer & easier to maintain) return self._set(m=value) from pyspark import keyword_only def getM(self): from pyspark.ml.param.shared import Param, Params, TypeConverters, addParam return self.getOrDefault(self.m) from pyspark.ml import Transformer c = Param(Params._dummy(), "c", "the y offset when x = 0. (float)", typeConverter=TypeConverters.toFloat) class StraightLine(Transformer): def setC(self, value): addParam("inputCol", "specify the input column name (your X).", String, None) return self._set(c=value) addParam("outputCol", "specify the output column name (your Y).", String, None) def getC(self): return self.getOrDefault(self.c) addParam("m", "specify m - the slope of the line.", Float, 1.0) addParam("c", "specify c - the y offset when x = 0.", Float, 0.0) def _transform(self, dataset): input_col = self.getInputCol() def _transform(self, dataset): if not input_col: return dataset.selectExpr("*", raise Exception("inputCol not supplied") str(self.getM()) + " * " + self.getInputCol() + " + " + str(self.getC()) + " AS " + self.getOutputCol()) output_col = self.getOutputCol() if not output_col: GE Aviationnot raise Exception("outputCol - porting analytics supplied") to PySpark ML Pipelines | 4 Oct 2018 12 return dataset.selectExpr("*", str(self.getM()) + " * " + input_col + " + " + str(self.getC()) + " AS " + output_col)
13 . • We created code to plot any Spark ML pipeline using bokeh, showing Params on hover Display Pipeline • Available on GitHub at: https://github.com/GeneralElectric/SparkMLPipelineDisplay • The following example is based on the example pipeline in the spark documentation: https://spark.apache.org/docs/2.2.0/ml-pipeline.html Before Training After Training GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 13
14 .We can now do the whole data science workflow in Spark in a notebook environment 1. Start from existing configuration file 3. Generate the ML Pipeline 4. Build the model(s) and display from config & display it 2. Explore the data - e.g. interactive bokeh plot 5. Calculate metrics – e.g. ROC curve GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 14
15 .Conclusions • GE Aviation uses analytics across the entire commercial engine operational lifecycle • Over the past few decades, we have seen an explosion in data, the trend is set to continue and today we have discussed solutions to manage this • Overcome real-life challenges of implementing python custom ML Pipeline analytics • Provided feedback on ways to make adding custom python ML libraries easier • Developed ML Pipeline display utility shared with the community • Completed entire analytic development and deployment lifecycle in Spark • Still working to port all analytics to Spark • Production deployment environment not yet in Spark GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 15
16 .Questions?