Spark 排序算法系列之(MLLib、ML)LR 使用方式介紹
“
【Spark排序算法系列】主要介紹的是目前推薦系統或者廣告點選方面用的比較廣的幾種演算法,和他們在Spark中的應用實現,本篇文章主要介紹LR演算法。
”
【Spark排序算法系列】主要介紹的是目前推薦系統或者廣告點選方面用的比較廣的幾種演算法,和他們在Spark中的應用實現,本篇文章主要介紹LR演算法。
本系列還包括(持續更新):
-
Spark排序算法系列之GBDT(梯度提升決策樹)
-
Spark排序算法系列之模型融合(GBDT+LR)
-
Spark排序算法系列之XGBoost
-
Spark排序算法系列之FTRL(Follow-the-regularized-Leader)
-
Spark排序算法系列之FM與FFM
背景
邏輯迴歸(Logistic Regression,LR)是較早應用在推薦排序上的,其屬於線性模型,模型簡單,可以引入海量離散特徵,這樣的好處就是模型可以考慮更加細節或者說針對具體個體的因素。如果想要引入非線性因素需要做特徵交叉,這樣很容易產生百億特徵,在很早之前ctr就主要靠堆人力搞特徵工程工作來持續優化效果。
雖然目前在工業界LR應用的並不多,但是對於初學者,一些中小企業或者應用場景不需要負責排序模型的時候,LR扔不失為一個不錯的選擇。
關於LR的演算法原理,這裡不做過多說明,可參考:
-
迴歸分析之邏輯迴歸-Logistic Regression
-
線性模型篇之Logistic Regression數學公式推導
LR介紹
LR的數學表示式可以簡寫為:
對於二分類模型,LR是一個分類演算法,模型計算得到預測值後會通過以下函式進轉化。
如果L(w,x,y) > 0.5 則是1 否則為0。當然在實際應用過程中,並不是一定取0.5作為界限值,而是根據實際情況進行調整。
二進位制迴歸可以轉化為多分類迴歸問題。關於多分類介紹和基於Spark實現多分類可參考多分類實現方式介紹和在Spark上實現多分類邏輯迴歸(Multinomial Logistic Regression)
在Spark.mllib包中提供了兩種LR分類模型,分別是:
-
mini-batch gradient descent(LogisticRegressionWithLBFGS)
-
L-BFGS(LogisticRegressionWithSGD)
但官方給出的建議是:推薦使用LBFGS,因為基於LBFGS的LR比基於SGD的能更快的收斂。其原話如下:
We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.
而且LRWithLBFGS不僅支援二分類還支援多分類,但LRWithSGD只支援二分類。所以後續只介紹下Spark mllib中的LogisticRegressionWithLBFGS相關操作。
mllib中的LRWithLBFGS
設定變數和建立spark物件
val file = "data/sample_libsvm_data.txt" val model_path = "model/lr/" val model_param = "numInterations:5,regParam:0.1,updater:SquaredL2Updater,gradient:LogisticGradient" val spark = SparkSession.builder() .master("local[5]") .appName("LogisticRegression_Model_Train") .getOrCreate() Logger.getRootLogger.setLevel(Level.WARN)
拆分資料集
// 記載資料集 並拆分成訓練集和測試集 val data = MLUtils.loadLibSVMFile(spark.sparkContext,file).randomSplit(Array(0.7,0.3)) val (train, test) = (data(0), data(1))
LRWithLBFGS模型設定引數
// 定義分類的數目,預設為2,是logisticregression的引數 private var numClass: Int = 2 // 定義是否新增截距,預設值為false,是logisticregression的引數 private var isAddIntercept: Option[Boolean] = None // 定義是否在訓練模型前進行驗證,是logisticregression的引數 private var isValidateData: Option[Boolean] = None // 定義迭代的次數,預設值是100,LBFGS的引數 private var numInterations: Option[Int] = None // 定義正則化係數值,預設值是0.0,LBFGS的引數 private var regParam: Option[Double] = None // 定義正則化引數,支援:L1Updater[L1]、SquaredL2Updater[L2]、SimpleUpdater[沒有正則項],LBFGS的引數 private var updater: Option[String] = None // 定義計算梯度的方式,支援:LogisticGradient、LeastSquaresGradient、HingeGradient ,LBFGS的引數 private var gradient: Option[String] = None // 人工定義的收斂閾值 private var threshold:Option[Double]=None // 定義模型收斂閾值,預設為 10^-6 private var convergenceTol: Double= 1.0e-6
建立模型
def createLRModel(model_param: String): LogisticRegressionWithLBFGS={ // 設定模型引數 val optimizer = new LROptimizer() optimizer.parseString(model_param) println(s"模型訓練引數為:${optimizer.toString}") // 建立模型並指定相關引數 val LRModel = new LogisticRegressionWithLBFGS() // 設定分類數目 LRModel.setNumClasses(optimizer.getNumClass) // 設定是否新增截距 if(optimizer.getIsAddIntercept.nonEmpty) {LRModel.setIntercept(optimizer.getIsAddIntercept.get)} // 設定是否進行驗證模型 if(optimizer.getIsValidateData.nonEmpty){LRModel.setValidateData(optimizer.getIsValidateData.get)} // 設定迭代次數 if(optimizer.getNumInterations.nonEmpty){LRModel.optimizer.setNumIterations((optimizer.getNumInterations.get))} // 設定正則項引數 if(optimizer.getRegParam.nonEmpty) { LRModel.optimizer.setRegParam(optimizer.getRegParam.get) } // 設定正則化引數 if(optimizer.getUpdater.nonEmpty){ optimizer.getUpdater match { case Some("L1Updater") => LRModel.optimizer.setUpdater( new L1Updater()) case Some("SquaredL2Updater") => LRModel.optimizer.setUpdater(new SquaredL2Updater()) case Some("SimpleUpdater") => LRModel.optimizer.setUpdater(new SimpleUpdater()) case _ => LRModel.optimizer.setUpdater(new SquaredL2Updater()) } } // 設定梯度計算方式 if(optimizer.getGradient.nonEmpty){ optimizer.getGradient match { case Some("LogisticGradient") => LRModel.optimizer.setGradient(new LogisticGradient()) case Some("LeastSquaresGradient") => LRModel.optimizer.setGradient(new LeastSquaresGradient()) case Some("HingeGradient") => LRModel.optimizer.setGradient(new HingeGradient()) case _ => LRModel.optimizer.setGradient(new LogisticGradient()) } } // 設定收斂閾值 if(optimizer.getThreshold.nonEmpty){ LRModel.optimizer.setConvergenceTol(optimizer.getThreshold.get)} else {LRModel.optimizer.setConvergenceTol(optimizer.getConvergenceTol)} LRModel }
模型效果評估
def evaluteResult(result: RDD[(Double,Double,Double)]) :Unit = { // MSE val testMSE = result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean() println(s"Test Mean Squared Error = $testMSE") // AUC val metrics = new BinaryClassificationMetrics(result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2) println(s"0-1 label AUC is = ${metrics.areaUnderROC}") val metrics1 = new BinaryClassificationMetrics(result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2) println(s"score-label AUC is = ${metrics1.areaUnderROC}") // 錯誤率 val error = result.filter(x => x._1!=x._2).count().toDouble / result.count() println(s"error is = $error") // 準確率 val accuracy = result.filter(x => x._1==x._2).count().toDouble / result.count() println(s"accuracy is = $accuracy") }
儲存模型
def saveModel(model: LogisticRegressionModel, model_path: String): Unit = { // 儲存模型檔案 obj val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+"model.obj")) out_obj.writeObject(model) // 儲存模型資訊 val model_info=new BufferedWriter(new FileWriter(model_path+"model_info.txt")) model_info.write(model.toString()) model_info.flush() model_info.close() // 儲存模型權重 val model_weights=new BufferedWriter(new FileWriter(model_path+"model_weights.txt")) model_weights.write(model.weights.toString) model_weights.flush() model_weights.close() println(s"模型資訊寫入檔案完成,路徑為:$model_path") }
載入模型
def loadModel(model_path: String): Option[LogisticRegressionModel] = { try{ val in = new ObjectInputStream( new FileInputStream(model_path) ) val model = Option( in.readObject().asInstanceOf[LogisticRegressionModel] ) in.close() println("Model Load Success") model } catch { case ex: ClassNotFoundException => { println(ex.printStackTrace()) None } case ex: IOException => { println(ex.printStackTrace()) println(ex) None } case _: Throwable => throw new Exception } }
使用載入的模型進行分值計算
// 載入obj檔案進行預測 val model_new = loadModel(s"$model_path/model.obj") // 使用載入的模型進行樣例預測 val result_new = test.map(line =>{ val pre_label = model_new.get.predict(line.features) // blas.ddot(x.length, x,1,y,1) (向量x的長度,向量x,向量x的索引遞增間隔,向量y,向量y的索引遞增間隔) val pre_score = blas.ddot(model.numFeatures, line.features.toArray, 1, model.weights.toArray, 1) val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1) (line.label, pre_label,score) } ) result_new.take(2).foreach(println)
ml中的二分類LR
ml包中的LR既可以用來做二分類,也可以用來做多分類。
-
二分類對應:Binomial logistic regression
-
多分類對應:multinomial logistic regression
其中二分類可以通過Binomial logistic regression 和 multinomial logistic regression實現。
基於Binomial logistic regression的LR實現:
def BinaryModel(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 建立模型 val LRModel = new LogisticRegression() .setMaxIter(20) .setRegParam(0.3) .setElasticNetParam(0.8) // 訓練評估模型 val model = LRModel.fit(train) evalute(model, train, spark) } def evalute(model: LogisticRegressionModel, train: Dataset[Row], spark: SparkSession):Unit = { // 列印模型引數 println(s"模型引數資訊如下:\n ${model.parent.explainParams()} \n") println(s"Coefficients(係數): ${model.coefficients}") println(s"Intercept(截距): ${model.intercept}") // 檢視訓練集的預測結果 rawPrediction:row 計算的分值,probability:經過sigmoid轉換後的概率 val result = model.evaluate(train) result.predictions.show(10) // 將 label,0 值概率,predict label提取出來 result.predictions.select("label","probability","prediction").rdd .map(row => (row.getDouble(0),row.get(1).asInstanceOf[DenseVector].toArray(0),row.getDouble(2))) .take(10).foreach(println) // 模型評估 val trainSummary = model.summary val objectiveHistory = trainSummary.objectiveHistory println("objectiveHistoryLoss:") objectiveHistory.foreach(loss => println(loss)) val binarySummary = trainSummary.asInstanceOf[BinaryLogisticRegressionSummary] val roc = binarySummary.roc roc.show() println(s"areaUnderROC: ${binarySummary.areaUnderROC}") // Set the model threshold to maximize F-Measure val fMeasure = binarySummary.fMeasureByThreshold fMeasure.show(10) val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0) import spark.implicits ._ val bestThreshold = fMeasure.where($"F-Measure"===maxFMeasure).select("threshold").head().getDouble(0) model.setThreshold(bestThreshold) }
基於Multimial logistic regression的LR實現:
def BinaryModelWithMulti(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 建立模型 val LRModel = new LogisticRegression() .setMaxIter(10) .setRegParam(0.3) .setElasticNetParam(0.8) .setFamily("multinomial") // 訓練模型 val model = LRModel.fit(train) // 列印模型引數 println(s"模型引數資訊如下:\n ${model.parent.explainParams()} \n") println(s"Coefficients(係數): ${model.coefficientMatrix}") println(s"Intercept(截距): ${model.interceptVector}") }
ml中的多分類LR
某條樣本屬於類別k的概率計算為:
其中K表示類別,J表示特徵個數
權重最小化使用的是最大似然函式,其更新公式如下:
使用的資料集形式為:
1 1:-0.222222 2:0.5 3:-0.762712 4:-0.833333 1 1:-0.555556 2:0.25 3:-0.864407 4:-0.916667 1 1:-0.722222 2:-0.166667 3:-0.864407 4:-0.833333 1 1:-0.722222 2:0.166667 3:-0.694915 4:-0.916667 0 1:0.166667 2:-0.416667 3:0.457627 4:0.5 1 1:-0.833333 3:-0.864407 4:-0.916667 2 1:-1.32455e-07 2:-0.166667 3:0.220339 4:0.0833333 2 1:-1.32455e-07 2:-0.333333 3:0.0169491 4:-4.03573e-08
多分類LR模型實現為:
def MultiModel(file_multi: String, spark: SparkSession, model_path: String): Unit = { val training = spark.read.format("libsvm").load(file_multi) val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.3) .setElasticNetParam(0.8) // Fit the model val lrModel = lr.fit(training) // Print the coefficients and intercept for multinomial logistic regression println(s"Coefficients: \n${lrModel.coefficientMatrix}") println(s"Intercepts: ${lrModel.interceptVector}") }
參考資料
-
https://spark.apache.org/docs/2.1.0/mllib-linear-methods.html#classification
-
https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#logistic-regression
-
https://blog.csdn.net/pupilxmk/article/details/80735599
在這浮躁的社會沉靜,用心記錄,用心學習!

關於【資料與演算法聯盟】
專注於推薦系統,深度學習,機器學習,資料探勘,雲端計算,人工智慧,架構和程式設計等技術乾貨的分享和探討,偶爾會推送一些福利,文字,攝影和遊記,掃碼關注,不再孤單。
更多幹貨,掃碼關注
熱
相關文章
多分類邏輯迴歸(Multinomial Logistic Regression)
歡迎投稿,凡是投稿一經錄用者,贈送技術圖書和相關學習資料
國內各大網際網路公司,可內推
關注公眾號,加小編微信,拉你進
【 資料與演算法交流群 】
你點的每個 “在 看” ,我都認真當成了喜歡