1. 程式人生 > >spark mllib原始碼分析之二分類邏輯迴歸evaluation

spark mllib原始碼分析之二分類邏輯迴歸evaluation

在邏輯迴歸分類中,我們評價分類器好壞的主要指標有精準率(precision),召回率(recall),F-measure,AUC等,其中最常用的是AUC,它可以綜合評價分類器效能,其他的指標主要偏重一些方面。我們介紹下spark中實現的這些評價指標,便於使用spark訓練模型後,對訓練結果進行評估。

1. 評價指標

1.1. 混淆矩陣

混淆矩陣(confusion matrix)用一張簡單的表格,反應分類器對樣本分類的情況

實際\預測 1 0
1 TP(True Positive) FN(Flase Negtive)
0 FP(False Positive) TN(True Negtive)

0/1代表兩類樣本,下面解釋下表格中的含義

  • TP:真陽性,預測是1,實際也是1
  • FP:假陽性,預測是1,實際是0
  • TN:真陰性,預測是0,實際也是0
  • FN:假陰性,預測是0,實際是1

不難看出,這個矩陣一條對角線上帶T的是預測正確的樣本(數量),另外一條對角線上帶F的是預測錯誤的樣本。

1.2. 基礎指標

由這個矩陣,我們可以計算一系列衡量分類器效能的指標

  • 準確率(Accuracy Rate)
(TP+TN)/(TP+FP+TN+FN)
分類器分對的樣本在總樣本中的比例
  • 精準度(Precision)
TP/(TP
+FP)(1)

真正的正樣本在分類器分出的正樣本中的比例
  • 召回率(Recall)
TP/(TP+FN)(2)
樣本中正例被正確分類的比例
  • TPR(True Positive Rate),同召回率
  • FPR(False Positive Rate)
FP/(FP+TN)(3)
被錯誤分成正例的樣本在實際負例樣本中的比例

1.3. F-measure

也稱F-score,綜合考慮precision和recall,經常用在資訊檢索中

Fβ=(β2+1)PRβ2P+R(4)
β=1時,就是F1-score

1.4. ROC

樣本經過分類器後,我們可以得到樣本的預測值,以這些預測值為閾值,就可以得到這些預測值對應的的混淆矩陣,每個混淆矩陣都可以計算(FPR, TPR)這樣的點對,將這些點對繪製在二維座標系中,然後連起來就得到了ROC曲線
這裡寫圖片描述


顯然座標(1, 0)是所有正例全部分錯,是最壞的情況,座標(0, 1)是正例全部分對,是最好的情況,而y=x這條線代表了隨機猜測的情況,因此正常的分類器的ROC曲線應該是高於這條直線的。

1.5. AUC

ROC是條曲線,不方便我們對比分類器的好壞,因此我們用ROC覆蓋的面積這樣一個數值來衡量分類器,AUC的計算方法主要有兩種,一種用相鄰兩點構成的等腰梯形近似計算,另外一種利用與Wilcoxon-Mann-Witney Test等價關係計算。

1.5.1. 直角梯形法

如1.3中的圖所示,ROC曲線上的兩個相鄰點(x1,x2),(y1,y2),以及它們在x軸上的投影構成了一個直角梯形,當兩個點足夠接近時,可以近似為兩點之間曲線下的面積

s=(y1x1)(x2+y2)/2(5)
將ROC曲線上的點依次組成這種對,連續計算相鄰兩點形成的直角梯形並累加即可得到近似的AUC值。

1.5.2. Wilcoxon-Mann-Witney Test

AUC和Wilcoxon-Mann-Witney Test是等價的,而Wilcoxon-Mann-Witney Test就是從樣本中任意抽取一個正例本和一個負例,正例大於負例score的概率。具體計算這個概率可以通過統計所有的 正負樣本對(M*N,M為正樣本數量,N為負樣本數量)中,正樣本score大於負樣本score的數量除以M*N來近似。如果這個pair的正負樣本 score相等,則按0.5計算,這個方法的複雜度為O((M+N)2)。在此基礎上,還有種改進方法,具體做法是將所有樣本按score從大到小逆序排序,然後取所有正樣本的排序次序r相加,

auc=positiveriM(M+1)/2MN(6)
這種方法下,如果某正例s的次序是rk,則算上這個樣本,比它score小的樣本數量就是rk,s與這些樣本組成的pair對中,再去掉小於等於它的正樣本就是需要計算的負樣本的個數,而這些需要去掉的正樣本數量則是M(對應最大score的正例),M1(對應score第二大的正例),依次類推,score最小的樣本則對應1,也就是對應數列M,M1,...,1,其和是M(M+1)/2,分母上再除去MN即可。

2. 實現

2.1. BinaryLabelCounter

記錄樣本label的分佈情況

private[evaluation] class BinaryLabelCounter(
    var numPositives: Long = 0L,
    var numNegatives: Long = 0L)

包含了正/負樣本的數量
值得注意的是其運算中相容了負例label為0/-1這兩種情況,只要label小於等於0.5就認為是負例

def +=(label: Double): BinaryLabelCounter = { 
  if (label > 0.5) numPositives += 1L else numNegatives += 1L
  this
}

2.2. confusion matrix

count是大於當前score的樣本的label分佈,totalCount是所有的label的分佈

private[evaluation] case class BinaryConfusionMatrixImpl(
    count: BinaryLabelCounter,
    totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix {

  /** TP */
  override def numTruePositives: Long = count.numPositives
  /** FP */
  override def numFalsePositives: Long = count.numNegatives
  /** FN */
  override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives
  /** TN */
  override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives
  /** number of positives */
  override def numPositives: Long = totalCount.numPositives
  /** number of negatives */
  override def numNegatives: Long = totalCount.numNegatives
}

2.3. 基礎指標

包括precision,FPR,TPR(Recall),F-score,這些指標都定義成object,繼承自BinaryClassificationMetricComputer基類,然後實現apply函式,可以不顯式使用new,而類似函式形式來計算,好處是用在高階函式的引數列表中,可以根據需要傳入需要計算的指標,非常靈活,參見BinaryClassificationMetrics中createCurve函式的用法,計算邏輯都比較直觀簡單。

2.3.1. precision

private[evaluation] object Precision extends BinaryClassificationMetricComputer {
  override def apply(c: BinaryConfusionMatrix): Double = {
    val totalPositives = c.numTruePositives + c.numFalsePositives
    if (totalPositives == 0) {
      1.0
    } else {
      //式(1)
      c.numTruePositives.toDouble / totalPositives
    }
  }
}

2.3.2. FPR

private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer {
  override def apply(c: BinaryConfusionMatrix): Double = {
    if (c.numNegatives == 0) {
      0.0
    } else {
      //式(3)
      c.numFalsePositives.toDouble / c.numNegatives
    }
  }
}

2.3.3. TPR(Recall)

private[evaluation] object Recall extends BinaryClassificationMetricComputer {
  override def apply(c: BinaryConfusionMatrix): Double = {
    if (c.numPositives == 0) {
      0.0
    } else {
      //式(2)
      c.numTruePositives.toDouble / c.numPositives
    }
  }
}

2.3.4. F-measure

private[evaluation] case class FMeasure(beta: Double) extends BinaryClassificationMetricComputer {
  private val beta2 = beta * beta
  override def apply(c: BinaryConfusionMatrix): Double = {
    val precision = Precision(c)
    val recall = Recall(c)
    if (precision + recall == 0) {
      0.0
    } else {
      //式(4)
      (1.0 + beta2) * (precision * recall) / (beta2 * precision + recall)
    }
  }
}

3. BinaryClassificationMetrics

計算樣本的分佈,構造ROC曲線,計算AUC等二分類評估指標

class BinaryClassificationMetrics @Since("1.3.0") (
    @Since("1.3.0") val scoreAndLabels: RDD[(Double, Double)],
    @Since("1.3.0") val numBins: Int)

類成員為含有預測值(score, label) pair對的樣本rdd,numBins是用於計算ROC時的用的點數,當樣本數遠大於numBins時則抽樣,相當於對樣本score做等頻離散化。

3.1. label分佈與混淆矩陣

計算樣本各score(預測值)的累積label分佈cumulativeCounts與混淆矩陣confusions

private lazy val ( 
  cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
  confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
  // Create a bin for each distinct score value, count positives and negatives within each bin,
  // and then sort by score values in descending order.
  //將具有相同預測值的樣本累計在一起並按降序排序,key是預測值,value是BinaryLabelCounter,累計正樣本和負樣本的個數
  val counts = scoreAndLabels.combineByKey(
    createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
    mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
    mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
  ).sortByKey(ascending = false)
  //抽樣並排序
  val binnedCounts =
    // Only down-sample if bins is > 0
    if (numBins == 0) {
      // Use original directly
      counts
    } else {
      val countsSize = counts.count() 
      // Group the iterator into chunks of about countsSize / numBins points,
      // so that the resulting number of bins is about numBins
      var grouping = countsSize / numBins
      if (grouping < 2) {
        // numBins was more than half of the size; no real point in down-sampling to bins
        logInfo(s"Curve is too small ($countsSize) for $numBins bins to be useful")
        counts
      } else {
      //樣本個數大於2倍numBins,抽樣
        if (grouping >= Int.MaxValue) {
          logWarning(
            s"Curve too large ($countsSize) for $numBins bins; capping at ${Int.MaxValue}")
          grouping = Int.MaxValue
        }
        //grouped是將迭代器每grouping個組成一個新的迭代器,例如[i1, i2, i3,...,i100],如果grouping為4,則[[i1,i2,i3,i4], [i5,i6,i7,i8], ...]
        counts.mapPartitions(_.grouped(grouping.toInt).map { pairs =>
          //取新組中的第一個分數為新的pair分數,相當於等頻離散化
          val firstScore = pairs.head._1
          //累加組內的label計數
          val agg = new BinaryLabelCounter()
          pairs.foreach(pair => agg += pair._2)
          //拼成新的pair,相當於抽樣了
          (firstScore, agg)
        })
      }
    }
  //按partition內累積
  val agg = binnedCounts.values.mapPartitions { iter =>
    val agg = new BinaryLabelCounter()
    iter.foreach(agg += _)
    Iterator(agg)
  }.collect()
  //part間累積
  val partitionwiseCumulativeCounts =
    agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c)
  val totalCount = partitionwiseCumulativeCounts.last
  logInfo(s"Total counts: $totalCount")
  //part內累積:每個score先整體累加前一個part,在累加part內其他scoreval cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
    (index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
      val cumCount = partitionwiseCumulativeCounts(index)
      iter.map { case (score, c) =>
        cumCount += c
        (score, cumCount.clone())
      }
    }, preservesPartitioning = true)
  cumulativeCounts.persist()
  val confusions = cumulativeCounts.map { case (score, cumCount) =>
    (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
  }
  (cumulativeCounts, confusions)
}

我們做ROC曲線,是一系列score以及在這個score下的混淆矩陣,其實需要的是以那個score為threshold的label分佈。舉例來說,如果有20個樣本,其score集合[0.9, 0.8. 0.7, 0.6, 0.5],對應樣本score:label的情況(key是label 0/1,value是數量,第一項代表score等於0.9的樣本中類0有1個,類1有2個)[[0:1, 1:2], [0:0, 1:3], [0:2, 1:3], [0:4, 1:2], [0:3, 1:0]],總的分佈是[0:10, 1:10],因此我們可以按序累積[[0:1, 1:2], [0:1, 1:5], [0:3, 1:8], [0:7, 1:10], [0:10, 1:10]],每一個都是累加前面的,這樣我們在最小的值就可以得到所有的分佈,當以0.8為threshold時,大於0.8的判定為1,其中label的分佈就是列表中的分佈[0:1, 1:5],判定為0的分佈用總的分佈減掉就是[0:9, 1:5],然後在計算混淆矩陣就非常容易了。
為了達到上面的目的,程式碼首先計算了每個score下的label分佈情況,然後逆序按從大到小排序(當然按順序排序也是可以,得到的就是小於這個score的分佈了),考慮到資料是分散式儲存在不同的機器上的,但因為整體有序(part間有序,part內有序),所有part1中的所有score肯定是大於part0中的,因此可以先按part累積,part內的元素再逐個累積,最後就可以得到每個score下的label分佈,比較巧妙。

3.2. createCurve函式

函式的入參是BinaryClassificationMetricComputer,可以根據需要計算的指標,返回pair指標,參考2.3節

/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { 
  confusions.map { case (s, c) =>
    (s, y(c))
  } 
}

/** Creates a curve of (metricX, metricY). */
private def createCurve(
    x: BinaryClassificationMetricComputer,
    y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = { 
  confusions.map { case (_, c) =>
    (x(c), y(c))
  } 
}

3.3. ROC

產生ROC曲線

def roc(): RDD[(Double, Double)] = { 
  //(FPR, TPR)
  val rocCurve = createCurve(FalsePositiveRate, Recall)
  val sc = confusions.context
  val first = sc.makeRDD(Seq((0.0, 0.0)), 1)
  val last = sc.makeRDD(Seq((1.0, 1.0)), 1)
  new UnionRDD[(Double, Double)](sc, Seq(first, rocCurve, last))
}

3.4. AUC

使用AreaUnderCurve計算AUC

def areaUnderROC(): Double = AreaUnderCurve.of(roc())

AreaUnderCurve使用直角梯形法計算曲線下的面積

//直角梯形的面積,式(5)
private def trapezoid(points: Seq[(Double, Double)]): Double = { 
  require(points.length == 2)
  val x = points.head
  val y = points.last
  (y._1 - x._1) * (y._2 + x._2) / 2.0
}

計算相鄰兩點構成的直角梯形的面積,入參是包含兩點的序列

def of(curve: RDD[(Double, Double)]): Double = { 
  curve.sliding(2).aggregate(0.0)(
    seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points),
    combOp = _ + _ 
  ) 
}

入參是ROC曲線,(FPR, TPR)對的RDD,每次滑動步長為1,視窗大小為2,構造包括相鄰兩點的陣列,計算曲線下面積,然後累加得到整個曲線的面積。

3.5. (precision, recall)曲線

PR曲線

def pr(): RDD[(Double, Double)] = { 
  val prCurve = createCurve(Recall, Precision)
  val sc = confusions.context
  val first = sc.makeRDD(Seq((0.0, 1.0)), 1)
  first.union(prCurve)
}

曲線面積

def areaUnderPR(): Double = AreaUnderCurve.of(pr())

3.6. 其他

score作為閾值(threshold)時,與其他指標構成的曲線,包括(threshold, F-measure),(threshold, precision),(threshold, recall),是要使用createCurve函式。

4. 結語

我們介紹了二分類的一些常用評價指標及在spark中的實現,其中的難點主要是label分佈的分散式統計,以及spark AUC的計算方式。