spark mllib原始碼分析之二分類邏輯迴歸evaluation
在邏輯迴歸分類中,我們評價分類器好壞的主要指標有精準率(precision),召回率(recall),F-measure,AUC等,其中最常用的是AUC,它可以綜合評價分類器效能,其他的指標主要偏重一些方面。我們介紹下spark中實現的這些評價指標,便於使用spark訓練模型後,對訓練結果進行評估。
1. 評價指標
1.1. 混淆矩陣
混淆矩陣(confusion matrix)用一張簡單的表格,反應分類器對樣本分類的情況
實際\預測 | 1 | 0 |
---|---|---|
1 | TP(True Positive) | FN(Flase Negtive) |
0 | FP(False Positive) | TN(True Negtive) |
0/1代表兩類樣本,下面解釋下表格中的含義
- TP:真陽性,預測是1,實際也是1
- FP:假陽性,預測是1,實際是0
- TN:真陰性,預測是0,實際也是0
- FN:假陰性,預測是0,實際是1
不難看出,這個矩陣一條對角線上帶T的是預測正確的樣本(數量),另外一條對角線上帶F的是預測錯誤的樣本。
1.2. 基礎指標
由這個矩陣,我們可以計算一系列衡量分類器效能的指標
- 準確率(Accuracy Rate)
分類器分對的樣本在總樣本中的比例
- 精準度(Precision)
真正的正樣本在分類器分出的正樣本中的比例
- 召回率(Recall)
樣本中正例被正確分類的比例
- TPR(True Positive Rate),同召回率
- FPR(False Positive Rate)
被錯誤分成正例的樣本在實際負例樣本中的比例
1.3. F-measure
也稱F-score,綜合考慮precision和recall,經常用在資訊檢索中
當
1.4. ROC
樣本經過分類器後,我們可以得到樣本的預測值,以這些預測值為閾值,就可以得到這些預測值對應的的混淆矩陣,每個混淆矩陣都可以計算(FPR, TPR)這樣的點對,將這些點對繪製在二維座標系中,然後連起來就得到了ROC曲線
顯然座標(1, 0)是所有正例全部分錯,是最壞的情況,座標(0, 1)是正例全部分對,是最好的情況,而
1.5. AUC
ROC是條曲線,不方便我們對比分類器的好壞,因此我們用ROC覆蓋的面積這樣一個數值來衡量分類器,AUC的計算方法主要有兩種,一種用相鄰兩點構成的等腰梯形近似計算,另外一種利用與Wilcoxon-Mann-Witney Test等價關係計算。
1.5.1. 直角梯形法
如1.3中的圖所示,ROC曲線上的兩個相鄰點
將ROC曲線上的點依次組成這種對,連續計算相鄰兩點形成的直角梯形並累加即可得到近似的AUC值。
1.5.2. Wilcoxon-Mann-Witney Test
AUC和Wilcoxon-Mann-Witney Test是等價的,而Wilcoxon-Mann-Witney Test就是從樣本中任意抽取一個正例本和一個負例,正例大於負例score的概率。具體計算這個概率可以通過統計所有的 正負樣本對(M*N,M為正樣本數量,N為負樣本數量)中,正樣本score大於負樣本score的數量除以M*N來近似。如果這個pair的正負樣本 score相等,則按0.5計算,這個方法的複雜度為
這種方法下,如果某正例s的次序是
2. 實現
2.1. BinaryLabelCounter
記錄樣本label的分佈情況
private[evaluation] class BinaryLabelCounter(
var numPositives: Long = 0L,
var numNegatives: Long = 0L)
包含了正/負樣本的數量
值得注意的是其運算中相容了負例label為0/-1這兩種情況,只要label小於等於0.5就認為是負例
def +=(label: Double): BinaryLabelCounter = {
if (label > 0.5) numPositives += 1L else numNegatives += 1L
this
}
2.2. confusion matrix
count是大於當前score的樣本的label分佈,totalCount是所有的label的分佈
private[evaluation] case class BinaryConfusionMatrixImpl(
count: BinaryLabelCounter,
totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix {
/** TP */
override def numTruePositives: Long = count.numPositives
/** FP */
override def numFalsePositives: Long = count.numNegatives
/** FN */
override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives
/** TN */
override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives
/** number of positives */
override def numPositives: Long = totalCount.numPositives
/** number of negatives */
override def numNegatives: Long = totalCount.numNegatives
}
2.3. 基礎指標
包括precision,FPR,TPR(Recall),F-score,這些指標都定義成object,繼承自BinaryClassificationMetricComputer基類,然後實現apply函式,可以不顯式使用new,而類似函式形式來計算,好處是用在高階函式的引數列表中,可以根據需要傳入需要計算的指標,非常靈活,參見BinaryClassificationMetrics中createCurve函式的用法,計算邏輯都比較直觀簡單。
2.3.1. precision
private[evaluation] object Precision extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
val totalPositives = c.numTruePositives + c.numFalsePositives
if (totalPositives == 0) {
1.0
} else {
//式(1)
c.numTruePositives.toDouble / totalPositives
}
}
}
2.3.2. FPR
private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
if (c.numNegatives == 0) {
0.0
} else {
//式(3)
c.numFalsePositives.toDouble / c.numNegatives
}
}
}
2.3.3. TPR(Recall)
private[evaluation] object Recall extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
if (c.numPositives == 0) {
0.0
} else {
//式(2)
c.numTruePositives.toDouble / c.numPositives
}
}
}
2.3.4. F-measure
private[evaluation] case class FMeasure(beta: Double) extends BinaryClassificationMetricComputer {
private val beta2 = beta * beta
override def apply(c: BinaryConfusionMatrix): Double = {
val precision = Precision(c)
val recall = Recall(c)
if (precision + recall == 0) {
0.0
} else {
//式(4)
(1.0 + beta2) * (precision * recall) / (beta2 * precision + recall)
}
}
}
3. BinaryClassificationMetrics
計算樣本的分佈,構造ROC曲線,計算AUC等二分類評估指標
class BinaryClassificationMetrics @Since("1.3.0") (
@Since("1.3.0") val scoreAndLabels: RDD[(Double, Double)],
@Since("1.3.0") val numBins: Int)
類成員為含有預測值(score, label) pair對的樣本rdd,numBins是用於計算ROC時的用的點數,當樣本數遠大於numBins時則抽樣,相當於對樣本score做等頻離散化。
3.1. label分佈與混淆矩陣
計算樣本各score(預測值)的累積label分佈cumulativeCounts與混淆矩陣confusions
private lazy val (
cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
// Create a bin for each distinct score value, count positives and negatives within each bin,
// and then sort by score values in descending order.
//將具有相同預測值的樣本累計在一起並按降序排序,key是預測值,value是BinaryLabelCounter,累計正樣本和負樣本的個數
val counts = scoreAndLabels.combineByKey(
createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
).sortByKey(ascending = false)
//抽樣並排序
val binnedCounts =
// Only down-sample if bins is > 0
if (numBins == 0) {
// Use original directly
counts
} else {
val countsSize = counts.count()
// Group the iterator into chunks of about countsSize / numBins points,
// so that the resulting number of bins is about numBins
var grouping = countsSize / numBins
if (grouping < 2) {
// numBins was more than half of the size; no real point in down-sampling to bins
logInfo(s"Curve is too small ($countsSize) for $numBins bins to be useful")
counts
} else {
//樣本個數大於2倍numBins,抽樣
if (grouping >= Int.MaxValue) {
logWarning(
s"Curve too large ($countsSize) for $numBins bins; capping at ${Int.MaxValue}")
grouping = Int.MaxValue
}
//grouped是將迭代器每grouping個組成一個新的迭代器,例如[i1, i2, i3,...,i100],如果grouping為4,則[[i1,i2,i3,i4], [i5,i6,i7,i8], ...]
counts.mapPartitions(_.grouped(grouping.toInt).map { pairs =>
//取新組中的第一個分數為新的pair分數,相當於等頻離散化
val firstScore = pairs.head._1
//累加組內的label計數
val agg = new BinaryLabelCounter()
pairs.foreach(pair => agg += pair._2)
//拼成新的pair,相當於抽樣了
(firstScore, agg)
})
}
}
//按partition內累積
val agg = binnedCounts.values.mapPartitions { iter =>
val agg = new BinaryLabelCounter()
iter.foreach(agg += _)
Iterator(agg)
}.collect()
//part間累積
val partitionwiseCumulativeCounts =
agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c)
val totalCount = partitionwiseCumulativeCounts.last
logInfo(s"Total counts: $totalCount")
//part內累積:每個score先整體累加前一個part,在累加part內其他score的
val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
(index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
val cumCount = partitionwiseCumulativeCounts(index)
iter.map { case (score, c) =>
cumCount += c
(score, cumCount.clone())
}
}, preservesPartitioning = true)
cumulativeCounts.persist()
val confusions = cumulativeCounts.map { case (score, cumCount) =>
(score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
}
(cumulativeCounts, confusions)
}
我們做ROC曲線,是一系列score以及在這個score下的混淆矩陣,其實需要的是以那個score為threshold的label分佈。舉例來說,如果有20個樣本,其score集合[0.9, 0.8. 0.7, 0.6, 0.5],對應樣本score:label的情況(key是label 0/1,value是數量,第一項代表score等於0.9的樣本中類0有1個,類1有2個)[[0:1, 1:2], [0:0, 1:3], [0:2, 1:3], [0:4, 1:2], [0:3, 1:0]],總的分佈是[0:10, 1:10],因此我們可以按序累積[[0:1, 1:2], [0:1, 1:5], [0:3, 1:8], [0:7, 1:10], [0:10, 1:10]],每一個都是累加前面的,這樣我們在最小的值就可以得到所有的分佈,當以0.8為threshold時,大於0.8的判定為1,其中label的分佈就是列表中的分佈[0:1, 1:5],判定為0的分佈用總的分佈減掉就是[0:9, 1:5],然後在計算混淆矩陣就非常容易了。
為了達到上面的目的,程式碼首先計算了每個score下的label分佈情況,然後逆序按從大到小排序(當然按順序排序也是可以,得到的就是小於這個score的分佈了),考慮到資料是分散式儲存在不同的機器上的,但因為整體有序(part間有序,part內有序),所有part1中的所有score肯定是大於part0中的,因此可以先按part累積,part內的元素再逐個累積,最後就可以得到每個score下的label分佈,比較巧妙。
3.2. createCurve函式
函式的入參是BinaryClassificationMetricComputer,可以根據需要計算的指標,返回pair指標,參考2.3節
/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
confusions.map { case (s, c) =>
(s, y(c))
}
}
/** Creates a curve of (metricX, metricY). */
private def createCurve(
x: BinaryClassificationMetricComputer,
y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
confusions.map { case (_, c) =>
(x(c), y(c))
}
}
3.3. ROC
產生ROC曲線
def roc(): RDD[(Double, Double)] = {
//(FPR, TPR)
val rocCurve = createCurve(FalsePositiveRate, Recall)
val sc = confusions.context
val first = sc.makeRDD(Seq((0.0, 0.0)), 1)
val last = sc.makeRDD(Seq((1.0, 1.0)), 1)
new UnionRDD[(Double, Double)](sc, Seq(first, rocCurve, last))
}
3.4. AUC
使用AreaUnderCurve計算AUC
def areaUnderROC(): Double = AreaUnderCurve.of(roc())
AreaUnderCurve使用直角梯形法計算曲線下的面積
//直角梯形的面積,式(5)
private def trapezoid(points: Seq[(Double, Double)]): Double = {
require(points.length == 2)
val x = points.head
val y = points.last
(y._1 - x._1) * (y._2 + x._2) / 2.0
}
計算相鄰兩點構成的直角梯形的面積,入參是包含兩點的序列
def of(curve: RDD[(Double, Double)]): Double = {
curve.sliding(2).aggregate(0.0)(
seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points),
combOp = _ + _
)
}
入參是ROC曲線,(FPR, TPR)對的RDD,每次滑動步長為1,視窗大小為2,構造包括相鄰兩點的陣列,計算曲線下面積,然後累加得到整個曲線的面積。
3.5. (precision, recall)曲線
PR曲線
def pr(): RDD[(Double, Double)] = {
val prCurve = createCurve(Recall, Precision)
val sc = confusions.context
val first = sc.makeRDD(Seq((0.0, 1.0)), 1)
first.union(prCurve)
}
曲線面積
def areaUnderPR(): Double = AreaUnderCurve.of(pr())
3.6. 其他
score作為閾值(threshold)時,與其他指標構成的曲線,包括(threshold, F-measure),(threshold, precision),(threshold, recall),是要使用createCurve函式。
4. 結語
我們介紹了二分類的一些常用評價指標及在spark中的實現,其中的難點主要是label分佈的分散式統計,以及spark AUC的計算方式。