1. 程式人生 > >如何在Java應用裡整合Spark MLlib訓練好的模型做預測

如何在Java應用裡整合Spark MLlib訓練好的模型做預測

前言

昨天媛媛說,你是不是很久沒寫部落格了。我說上一篇1.26號,昨天3.26號,剛好兩個月,心中也略微有些愧疚。今天正好有個好朋友問,怎麼在Java應用裡整合Spark MLlib訓練好的模型。在StreamingPro裡其實都有實際的使用例子,但是如果有一篇文章講述下,我覺得應該能讓更多人獲得幫助

追本溯源

記得我之前吐槽過Spark MLlib的設計,也是因為一個朋友使用了spark MLlib的pipeline做訓練,然後他把這個pipeline放到了spring boot裡,結果做預測的時候奇慢無比,一條記錄inference需要30多秒。為什麼會這麼慢呢?原因是Spark MLlib 是以批處理為核心設計理念的。比如上面朋友遇到的坑是有一部分原因來源於word2vec的transform方法:

@Since("2.0.0")
  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema, logging = true)
    val vectors = wordVectors.getVectors
      .mapValues(vv => Vectors.dense(vv.map(_.toDouble)))
      .map(identity) // mapValues doesn't return a serializable map (SI-7005)
    val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)
    val d = $(vectorSize)

來一條資料(通常API應用都是如此),他需要先獲得vectors(詞到vector的對映)物件,假設你有十萬個詞,

def getVectors: Map[String, Array[Float]] = {
    wordIndex.map { case (word, ind) =>
      (word, wordVectors.slice(vectorSize * ind, vectorSize * ind + vectorSize))
    }
  }

每次請求他都要做如上呼叫和計算。接著還需要把這些東西(這個可能就比較大了,幾百M或者幾個G都有可能)廣播出去。

所以註定快不了。

把model整合到Java 服務裡實例

假設你使用貝葉斯訓練了一個模型,你需要儲存下這個模型,儲存的方式如下:

val nb = new NaiveBayes()
//做些引數配置和訓練過程
.....
//儲存模型
nb.write.overwrite().save(path + "/" + modelIndex)

接著,在你的Java/scala程式裡,引入spark core,spark mllib等包。載入模型:

val model = NaiveBayesModel.load(tempPath)

這個時候因為要做預測,我們為了效能,不能直接呼叫model的transform方法,你仔細觀察發現,我們需要通過反射呼叫兩個方法,就能實現分類。第一個是predictRaw方法,該方法輸入一個向量,輸出也為一個向量。我們其實不需要向量,我們需要的是一個分類的id。predictRaw 方法在model裡,但是沒辦法直接呼叫,因為是私有的:

  override protected def predictRaw(features: Vector): Vector = {
    $(modelType) match {
      case Multinomial =>
        multinomialCalculation(features)
      case Bernoulli =>
        bernoulliCalculation(features)
      case _ =>
        // This should never happen.
        throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
    }
  }

所以我們需要通過反射來完成:

val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]

現在我們已經得到了predctRaw的結果,接著我們要用raw2probability 把向量轉化為一個概率分佈,因為spark 版本不同,該方法的簽名也略有變化,所以可能要做下版本適配:

val raw2probabilityMethod = if (sparkSession.version.startsWith("2.3")) "raw2probabilityInPlace" else "raw2probability"
val raw2probability = model.getClass.getMethod(raw2probabilityMethod, classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]

raw2probability 其實也還是一個向量,這個向量的長度是分類的數目,每個位置的值是概率。所以所以我們只要拿到最大的那個概率值所在的位置就行:

val categoryId = raw2probability.argmax

這個時候categoryId 就是我們預測的分類了。

截止到目前我們已經完成了作為一個普通java/scala 方法的呼叫流程。如果我不想用在應用程式裡,而是放到spark 流式計算裡呢?或者批處理也行,那麼這個時候你只需要封裝一個UDF函式即可:

val models = sparkSession.sparkContext.broadcast(_model.asInstanceOf[ArrayBuffer[NaiveBayesModel]])
val f2 = (vec: Vector) => {
      models.value.map { model =>
        val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]
        val raw2probability = model.getClass.getMethod(raw2probabilityMethod, classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]
        //model.getClass.getMethod("probability2prediction", classOf[Vector]).invoke(model, raw2probability).asInstanceOf[Vector]
        raw2probability
      }
    }

    sparkSession.udf.register(name , f2)

上面的例子可以參考StreamingPro 中streaming.dsl.mmlib.algs.SQLNaiveBayes的程式碼。不同的演算法因為內部實現不同,我們使用起來也會略微有些區別。

總結

Spark MLlib學習了SKLearn裡的transform和fit的概念,但是因為設計上還是遵循批處理的方式,實際部署後會有很大的效能瓶頸,不適合那種資料一條一條過來需要快速響應的預測流程,所以需要呼叫一些內部的API來完成最後的預測。



作者:祝威廉
連結:https://www.jianshu.com/p/3c038027ff61
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。