1. 程式人生 > >自定義 spark transformer 和 estimator 的範例

自定義 spark transformer 和 estimator 的範例

https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types

要了解有關Spark ML所基於的資料集API的未來的更多資訊,請檢視Holden Karau和Seth Hendrickson的會話Spark Structured Streaming,以便在2017年3月14日至16日在Strata + Hadoop World San Jose 進行機器學習。是Python更多你的事情?2017年2月9日,請參閱Karau在2017年東部Spark Summit的除錯 PySpark的演講。

您還可以在Holden Karau和Rachel Warren的“ 高效能Spark:擴充套件和優化Apache Spark的最佳實踐 ”中瞭解更多資訊。

雖然Spark ML管道具有多種演算法,但您可能會發現自己需要其他功能而無需離開管道模型。在Spark MLlib中,這不是什麼大問題 - 您可以使用RDD轉換手動實現演算法並繼續從那裡開始。對於Spark ML管道,同樣的方法可以工作,但是我們失去了一些很好的整合管道屬性,包括自動執行元演算法的能力,例如交叉驗證引數搜尋。在本文中,您將學習如何使用標準wordcount示例作為起點擴充套件Spark ML管道模型(人們永遠無法逃避大資料wordcount示例的介紹)。

要將自己的演算法新增到Spark管道,您需要實現Estimator或Transformer實現PipelineStage介面。對於不需要培訓的演算法,您可以實現Transformer介面,對於經過培訓的演算法,您可以實現Estimator介面org.apache.spark.ml(兩者都實現基礎PipelineStage)。請注意,培訓不僅限於複雜的機器學習模型; 甚至MinMaxScaler也需要培訓來確定範圍。如果他們需要培訓,他們必須建造Estimator而不是Transformer。

STRATA資料會議

2014年3月25日至28日在舊金山舉行的Strata資料會議
最優價格將於1月11日結束
注意
PipelineStage直接使用不起作用,因為在管道內使用了適合的反射,假設所有階段都是a Estimator或a Transformer。
除了顯而易見的transform或fit函式之外,所有管道階段都需要提供transformSchema,copy建構函式或實現一個類,它為您提供這些 - 用於copy製作當前階段的副本,任何新指定的引數合併在一起,並且可以簡單地呼叫defaultCopy(除非你的類有特殊的建構函式注意事項)。

顯示了流水線​​階段的開始以及複製委託 - transformSchema必須根據任何引數集和輸入模式生成管道階段的預期輸出。大多數管道階段只需新增新欄位; 如果需要,很少刪除以前的欄位,但這有時會導致記錄包含的資料多於下游所需的資料,從而對效能產生負面影響。如果您發現這是管道中的問題,您可以建立自己的階段以刪除不必要的欄位。

class HardCodedWordCountStage(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID("hardcodedwordcount"))

def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}
除了生成輸出模式之外,該transformSchema函式還應驗證輸入模式是否適合該階段(例如,輸入列是預期型別)。

這也是您應該對階段引數執行驗證的地方。

transformSchema具有硬編碼輸入和輸出列的字串輸入和向量輸出的簡單說明如下。

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex("happy_pandas")
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField("happy_panda_counts", IntegerType, false))
}
使用該Transformer介面可以非常簡單地實現不需要訓練的演算法。由於這是最簡單的流水線階段,因此您可以從實現一個簡單的變換器開始,該變換器計算輸入列上的字數。

def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(" ").size }
df.select(col("*"),
wordcount(df.col("happy_pandas")).as("happy_panda_counts"))
}
要充分利用管道介面,您需要使用params介面配置管道階段。

O'Reilly資料通訊
獲取O'Reilly資料通訊
每週接收業內人士的見解 - 以及有關資料主題的獨家內容,優惠等。

你的郵件

國家

訂閱
請閱讀我們的隱私政策。
雖然params介面是公共的,但遺憾的是,Spark中常用的常見預設引數是私有的,因此最終會有一些程式碼重複。除了允許使用者指定值之外,引數還可以包含一些基本驗證邏輯(例如,正則化引數必須設定為非負值)。兩個最常見的引數是輸入列和輸出列,您可以相對簡單地新增到模型中。

除了字串引數之外,還可以使用任何其他型別,包括停用詞之類的字串列表,以及停用詞之類的字串。

class ConfigurableWordCount(override val uid: String) extends Transformer {
final val inputCol= new Param[String](this, "inputCol", "The input column")
final val outputCol = new Param[String](this, "outputCol", "The output column")

; def setInputCol(value: String): this.type = set(inputCol, value)

def setOutputCol(value: String): this.type = set(outputCol, value)

def this() = this(Identifiable.randomUID("configurablewordcount"))

def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}

def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(" ").size }
df.select(col("*"), wordcount(df.col($(inputCol))).as($(outputCol)))
}
}
需要訓練的演算法可以使用Estimator介面實現- 儘管對於許多演算法,org.apache.spark.ml.Predictor或者org.apache.spark.ml.classificationClassifier輔助類更容易實現。Estimator和Transformer介面之間的主要區別在於,不是直接在輸入上表達轉換,而是首先以train函式的形式進行訓練。字串索引器是您可以實現的最簡單的估算器之一,雖然它已經在Spark中可用,但仍然是如何使用估計器介面的一個很好的例證。

trait SimpleIndexerParams extends Params {
final val inputCol= new Param[String](this, "inputCol", "The input column")
final val outputCol = new Param[String](this, "outputCol", "The output column")
}

class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {

def setInputCol(value: String) = set(inputCol, value)

def setOutputCol(value: String) = set(outputCol, value)

def this() = this(Identifiable.randomUID("simpleindexer"))

override def copy(extra: ParamMap): SimpleIndexer = {
defaultCopy(extra)
}

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}

override def fit(dataset: Dataset[_]): SimpleIndexerModel = {
import dataset.sparkSession.implicits._
val words = dataset.select(dataset($(inputCol)).as[String]).distinct
.collect()
new SimpleIndexerModel(uid, words)
; }
}

class SimpleIndexerModel(
override val uid: String, words: Array[String]) extends Model[SimpleIndexerModel] with SimpleIndexerParams {

override def copy(extra: ParamMap): SimpleIndexerModel = {
defaultCopy(extra)
}

private val labelToIndex: Map[String, Double] = words.zipWithIndex.
map{case (x, y) => (x, y.toDouble)}.toMap

override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}

override def transform(dataset: Dataset[_]): DataFrame = {
val indexer = udf { label: String => labelToIndex(label) }
dataset.select(col("*"),
indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol)))
}
}
如果要實現迭代演算法,您可能希望在未快取輸入資料時自動快取輸入資料,或者允許使用者指定永續性級別。

O'REILLY線上學習

學得更快。深入挖掘。再看一點。
加入O'Reilly的線上學習平臺。立即免費試用,即時查詢答案,或掌握新的有用的東西。

學到更多
所述Predictor介面將兩個最常用的引數(輸入和輸出列)作為標籤欄,設有柱,和預測列和自動處理的模式轉換為我們。

該Classifier介面做許多事一樣,但它也增加了rawPredictionColumn,並提供工具來檢測(班數getNumClasses),以及輸入轉換DataFrame到一個RDD LabeledPoints(使它更容易包裝舊的MLlib分類演算法)。

如果要實現迴歸或群集介面,則不需要使用公共基本介面集,因此您需要使用通用Estimator介面。

// Simple Bernouli Naive Bayes classifier - no sanity checks for brevity
// Example only - not for production use.
class SimpleNaiveBayes(val uid: String)
extends Classifier[Vector,SimpleNaiveBayes,SimpleNaiveBayesModel] {

def this() = this(Identifiable.randomUID("simple-naive-bayes"))

override def train(ds: Dataset[_]): SimpleNaiveBayesModel = {
import ds.sparkSession.implicits._
ds.cache()
// Note: you can use getNumClasses and extractLabeledPoints to get an RDD instead
// Using the RDD approach is common when integrating with legacy machine learning code
// or iterative algorithms which can create large query plans.
// Here we use Datasets since neither of those apply.

// Compute the number of documents
val numDocs = ds.count
// Get the number of classes.
// Note this estimator assumes they start at 0 and go to numClasses
val numClasses = getNumClasses(ds)
// Get the number of features by peaking at the first row
val numFeatures: Integer = ds.select(col($(featuresCol))).head
  .get(0).asInstanceOf[Vector].size
// Determine the number of records for each class
val groupedByLabel = ds.select(col($(labelCol)).as[Double]).groupByKey(x => x)
val classCounts = groupedByLabel.agg(count("*").as[Long])
  .sort(col("value")).collect().toMap
// Select the labels and features so we can more easily map over them.
// Note: we do this as a DataFrame using the untyped API because the Vector
// UDT is no longer public.
val df = ds.select(col($(labelCol)).cast(DoubleType), col($(featuresCol)))
// Figure out the non-zero frequency of each feature for each label and
// output label index pairs using a case clas to make it easier to work with.
val labelCounts: Dataset[LabeledToken] = df.flatMap {
  case Row(label: Double, features: Vector) =>
    features.toArray.zip(Stream from 1)
      .filter{vIdx => vIdx._2 == 1.0}
      .map{case (v, idx) => LabeledToken(label, idx)}
}
// Use the typed Dataset aggregation API to count the number of non-zero
// features for each label-feature index.
val aggregatedCounts: Array[((Double,Integer),Long)] = labelCounts
  .groupByKey(x => (x.label, x.index))
  .agg(count("*").as[Long]).collect()

val theta = Array.fill(numClasses)(new Array[Double](numFeatures))

// Compute the denominator for the general prioirs
val piLogDenom = math.log(numDocs + numClasses)
// Compute the priors for each class
val pi = classCounts.map{case(_, cc) =>
  math.log(cc.toDouble) - piLogDenom }.toArray

// For each label/feature update the probabilities
aggregatedCounts.foreach{case ((label, featureIndex), count) =>
  // log of number of documents for this label + 2.0 (smoothing)
  val thetaLogDenom = math.log(
    classCounts.get(label).map(_.toDouble).getOrElse(0.0) + 2.0)
  theta(label.toInt)(featureIndex) = math.log(count + 1.0) - thetaLogDenom
}
// Unpersist now that we are done computing everything
ds.unpersist()
// Construct a model
new SimpleNaiveBayesModel(uid, numClasses, numFeatures, Vectors.dense(pi),
  new DenseMatrix(numClasses, theta(0).length, theta.flatten, true))

}

override def copy(extra: ParamMap) = {
defaultCopy(extra)
}
}

// Simplified Naive Bayes Model
case class SimpleNaiveBayesModel(
override val uid: String,
override val numClasses: Int,
override val numFeatures: Int,
val pi: Vector,
val theta: DenseMatrix) extends
ClassificationModel[Vector,SimpleNaiveBayesModel] {

override def copy(extra: ParamMap) = {
defaultCopy(extra)
}

// We have to do some tricks here because we are using Spark's
// Vector/DenseMatrix calculations - but for your own model don't feel
// limited to Spark's native ones.
val negThetaArray = theta.values.map(v => math.log(1.0 - math.exp(v)))
val negTheta = new DenseMatrix(numClasses, numFeatures, negThetaArray, true)
val thetaMinusNegThetaArray = theta.values.zip(negThetaArray)
.map{case (v, nv) => v - nv}
val thetaMinusNegTheta = new DenseMatrix(
numClasses, numFeatures, thetaMinusNegThetaArray, true)
val onesVec = Vectors.dense(Array.fill(theta.numCols)(1.0))
val negThetaSum: Array[Double] = negTheta.multiply(onesVec).toArray

// Here is the prediciton functionality you need to implement - for ClassificationModels
// transform automatically wraps this - but if you might benefit from broadcasting your model or
// other optimizations you can also override transform.
def predictRaw(features: Vector): Vector = {
// Toy implementation - use BLAS or similar instead
// the summing of the three vectors but the functionality isn't exposed.
Vectors.dense(thetaMinusNegTheta.multiply(features).toArray.zip(pi.toArray)
.map{case (x, y) => x + y}.zip(negThetaSum).map{case (x, y) => x + y}
)
}
}