機器學習 MLeap 線下線上 pipeline 測試
實驗目的
為了實現開箱即用的機器學習平臺,只需簡單修改配置,就可實現線下特徵處理和訓練的模型,一鍵部署到線上,保持線上線下一致性。離線訓練平臺選擇了spark,線上模型部署選擇了Mleap。
實驗說明
使用開源資料lending_club,基於spark pipeline構造LR模型訓練,並使用MLeap對模型進行儲存,並通過單機執行MLeap,載入模型通過PipeLine對資料進行預測,達到PipeLine自動部署,線上線下特徵轉換的一致性。
offline
第一、二步,對lending_club資料進行簡單處理,比如資料去除null
第三步,把資料分為train和test兩個資料集
第四-七步,構造特徵處理的Pipeline
第八步,訓練模型
第九步,呼叫MLeap儲存模型
如下是具體程式碼:
package com.yeahmobi // Spark Training Pipeline Libraries import org.apache.spark.ml.feature._ import org.apache.spark.ml.classification.{LogisticRegression, RandomForestClassifier} import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.sql.SparkSession import com.databricks.spark.avro._ // MLeap/Bundle.ML Serialization Libraries import ml.combust.mleap.spark.SparkSupport._ import resource._ import ml.combust.bundle.BundleFile import org.apache.spark.ml.bundle.SparkBundleContext object LendingClubDemo { def main(args: Array[String]): Unit = { val inputFile = "s3://algo.yeahmobi.com/etl/test/lending_club.avro" val spark = SparkSession .builder() .appName("mleapDemo") .getOrCreate() //Step 1 load data and preprocess var dataset = spark.sqlContext.read.format("com.databricks.spark.avro"). load(inputFile) dataset.createOrReplaceTempView("df") println(dataset.count()) val datasetFnl = spark.sqlContext.sql(f""" select loan_amount, fico_score_group_fnl, case when dti >= 10.0 then 10.0 else dti end as dti, emp_length, case when state in ('CA', 'NY', 'MN', 'IL', 'FL', 'WA', 'MA', 'TX', 'GA', 'OH', 'NJ', 'VA', 'MI') then state else 'Other' end as state, loan_title, approved from df where loan_title in('Debt Consolidation', 'Other', 'Home/Home Improvement', 'Payoff Credit Card', 'Car Payment/Loan', 'Business Loan', 'Health/Medical', 'Moving', 'Wedding/Engagement', 'Vacation', 'College', 'Renewable Energy', 'Payoff Bills', 'Personal Loan', 'Motorcycle') """) println(datasetFnl.count()) // Step 2: Define continous and categorical features and filter nulls val continuousFeatures = Array("loan_amount", "dti") val categoricalFeatures = Array("loan_title", "emp_length", "state", "fico_score_group_fnl") val allFeatures = continuousFeatures.union(categoricalFeatures) // Filter all null values val allCols = allFeatures.union(Seq("approved")).map(datasetFnl.col) val nullFilter = allCols.map(_.isNotNull).reduce(_ && _) val datasetImputedFiltered = datasetFnl.select(allCols: _*).filter(nullFilter).persist() println(datasetImputedFiltered.count()) //Step 3: Split data into training and validation¶ val Array(trainingDataset, validationDataset) = datasetImputedFiltered.randomSplit(Array(0.7, 0.3)) //Step 4: Continous Feature Pipeline val continuousFeatureAssembler = new VectorAssembler(uid = "continuous_feature_assembler"). setInputCols(continuousFeatures). setOutputCol("unscaled_continuous_features") val continuousFeatureScaler = new StandardScaler(uid = "continuous_feature_scaler"). setInputCol("unscaled_continuous_features"). setOutputCol("scaled_continuous_features") val polyExpansionAssembler = new VectorAssembler(uid = "poly_expansion_feature_assembler"). setInputCols(Array("loan_amount", "dti")). setOutputCol("poly_expansions_features") val continuousFeaturePolynomialExpansion = new PolynomialExpansion(uid = "polynomial_expansion_loan_amount"). setInputCol("poly_expansions_features"). setOutputCol("loan_amount_polynomial_expansion_features") //Step 5: Categorical Feature Pipeline val categoricalFeatureIndexers = categoricalFeatures.map { feature => new StringIndexer(uid = s"string_indexer_$feature"). setInputCol(feature). setOutputCol(s"${feature}_index") } val categoricalFeatureOneHotEncoders = categoricalFeatureIndexers.map { indexer => new OneHotEncoder(uid = s"oh_encoder_${indexer.getOutputCol}"). setInputCol(indexer.getOutputCol). setOutputCol(s"${indexer.getOutputCol}_oh") } //Step 6: Assemble our features and feature pipeline val featureColsLr = categoricalFeatureOneHotEncoders.map(_.getOutputCol).union(Seq("scaled_continuous_features")) //Step 7: assemble all processes categorical and continuous features into a single feature vector val featureAssemblerLr = new VectorAssembler(uid = "feature_assembler_lr"). setInputCols(featureColsLr). setOutputCol("features_lr") val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler, polyExpansionAssembler, continuousFeaturePolynomialExpansion). union(categoricalFeatureIndexers). union(categoricalFeatureOneHotEncoders). union(Seq(featureAssemblerLr)) val featurePipeline = new Pipeline(uid = "feature_pipeline"). setStages(estimators) val sparkFeaturePipelineModel = featurePipeline.fit(datasetImputedFiltered) //Step 8: Train Logistic Regression Model val logisticRegression = new LogisticRegression(uid = "logistic_regression"). setFeaturesCol("features_lr"). setLabelCol("approved"). setPredictionCol("approved_prediction") val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, logisticRegression)) val sparkPipelineLr = sparkPipelineEstimatorLr.fit(datasetImputedFiltered) println("Complete: Training Logistic Regression") //Step 9: (Optional): Serialize your models to bundle.ml val sbc = SparkBundleContext().withDataset(sparkPipelineLr.transform(datasetImputedFiltered)) for(bf <- managed(BundleFile("jar:file:/tmp/lc.model.lr.zip"))) { sparkPipelineLr.writeBundle.save(bf)(sbc).get } } }
Online
隨便提取 lending_club
中的一條資料,構造MLeap DataFrame,然後呼叫
val mleapPipeline = bundle.root
val frame2 = mleapPipeline.transform(frame)
即可以完成特徵處理和預測。
package com.yeahmobi import ml.combust.bundle.BundleFile import ml.combust.mleap.core.types.StructField import ml.combust.mleap.runtime.MleapSupport._ import ml.combust.mleap.tensor.DenseTensor import resource._ object Online { def main(args: Array[String]): Unit = { // load the Spark pipeline we saved in the previous section val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/D:\\workspace\\ml\\lc.model.lr.zip"))) yield { bundleFile.loadMleapBundle().get }).opt.get // create a simple LeapFrame to transform import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row} import ml.combust.mleap.core.types._ // MLeap makes extensive use of monadic types like Try val schema = StructType(StructField("loan_amount", ScalarType.Double), StructField("fico_score_group_fnl", ScalarType.String), StructField("dti", ScalarType.Double), StructField("emp_length", ScalarType.String), StructField("state", ScalarType.String), StructField("loan_title", ScalarType.String)).get val data = Seq(Row(1000.0,"700 - 800",0.1, "< 1 year","MA","Debt Consolidation")) val frame = DefaultLeapFrame(schema, data) // transform the dataframe using our pipeline val mleapPipeline = bundle.root val frame2 = mleapPipeline.transform(frame).get val data2 = frame2.dataset frame2.show() for(pro<-frame2.select("rawPrediction")){ println(pro.dataset.head.getTensor(0)) println(pro.dataset.head.getTensor(0).toArray.mkString(",")) } //println(data2(0).toList) //println(item.asInstanceOf[ml.combust.mleap.tensor.Tensor].toArray.mkString(",")) } }
總結
MLeap暫時不支援SQL,這樣不可以線下、線上對資料預處理保持一致,但是支援Spark Pipeline是他最大優點。
如有疑問,歡迎關注下面公眾號進行交流。
image.png