1. 程式人生 > >spark ml 聚類原始碼筆記一

spark ml 聚類原始碼筆記一

首先是引數

k : 聚類數,預設2

initMode : 初始化演算法的引數,可以是RANDOM或K_MEANS_PARALLEL,RANDOM是隨機選擇初始聚類中心,K_MEANS_PARALLEL是使用演算法選擇初始聚類中心,也是預設情況

initSteps : K_MEANS_PARALLEL方法迭代步數,預設5

接下來是一些重要的方法

private[clustering] def predict(features: Vector): Int = parentModel.predict(features)//predict預測新的屬性屬於哪個類

def clusterCenters: Array[Vector] = parentModel.clusterCenters//列出最終的聚類中心

  def computeCost(dataset: DataFrame): Double = {//計算距離平方的總和
    SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
    val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
    parentModel.computeCost(data)//主要是呼叫computeCost方法
  }

下面是聚類演算法預設的引數

  setDefault(//2類,迭代20次,初始化演算法為K_MEANS_PARALLEL,該演算法迭代5次,收斂引數0.0001
    k -> 2,
    maxIter -> 20,
    initMode -> MLlibKMeans.K_MEANS_PARALLEL,
    initSteps -> 5,
    tol -> 1e-4)

呼叫fit,填充引數

  override def fit(dataset: DataFrame): KMeansModel = {
    val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }


    val algo = new MLlibKMeans()
      .setK($(k))
      .setInitializationMode($(initMode))
      .setInitializationSteps($(initSteps))
      .setMaxIterations($(maxIter))
      .setSeed($(seed))
      .setEpsilon($(tol))
    val parentModel = algo.run(rdd)//最重要的就是這裡,algo是MLlibKMeans類,傳入要聚類的rdd,呼叫run方法,得到模型,下面我們就看下這個是如何執行的
    val model = new KMeansModel(uid, parentModel)
    copyValues(model)
  }

run方法

  def run(data: RDD[Vector]): KMeansModel = {


    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("The input data is not directly cached, which may hurt performance if its"
        + " parent RDDs are also uncached.")
    }


    // Compute squared norms and cache them.
    val norms = data.map(Vectors.norm(_, 2.0))//計算每個向量的2範數,即平方和開放
    norms.persist()//快取,因為要多次用這個
    val zippedData = data.zip(norms).map { case (v, norm) =>//把向量和2範數放在一起,VectorWithNorm是新的結構
      new VectorWithNorm(v, norm)
    }
    val model = runAlgorithm(zippedData)//把VectorWithNorm放裡面執行,返回模型
    norms.unpersist()//釋放快取的2範數


    // Warn at the end of the run as well, for increased visibility.
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("The input data was not directly cached, which may hurt performance if its"
        + " parent RDDs are also uncached.")
    }
    model//返回模型
  }

下面看這個模型是怎麼獲得的

  private def runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = {


    val sc = data.sparkContext


    val initStartTime = System.nanoTime()


    // Only one run is allowed when initialModel is given
    val numRuns = if (initialModel.nonEmpty) {//如果指定initialModel,這個initialModel就是初始化聚類中心,runs只能是1,這裡的nunRuns就是之前提到的initSteps,如果不指定聚類中心,預設執行5次
      if (runs > 1) logWarning("Ignoring runs; one run is allowed when initialModel is given.")
      1
    } else {
      runs
    }


    val centers = initialModel match {
      case Some(kMeansCenters) => {//如果指定聚類中心
        Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s)))//把中心弄成一個帶有2範數的VectorWithNorm的陣列
      }
      case None => {//如果沒指定
        if (initializationMode == KMeans.RANDOM) {//看initializationMode是隨機還是用演算法,不管用哪個,最後返回都是聚類中心的陣列,後續我們分別看這兩種方式
          initRandom(data)
        } else {
          initKMeansParallel(data)
        }
      }
    }
    val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) +
      " seconds.")


    val active = Array.fill(numRuns)(true)//搞兩個陣列,長度為執行次數,一個用true填充,用來表示是否還需要運算,一個用0填充,用來標示成本
    val costs = Array.fill(numRuns)(0.0)


    var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)//弄一個0-4的可變陣列
    var iteration = 0


    val iterationStartTime = System.nanoTime()


    // Execute iterations of Lloyd's algorithm until all runs have converged
    while (iteration < maxIterations && !activeRuns.isEmpty) {//按預設總迭代次數不超過20次,步數還有的迭代,後面會知道,迭代好的會過濾,所以越來越少
      type WeightedPoint = (Vector, Long)//定義一個型別
      def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = {//合併貢獻值
        axpy(1.0, x._1, y._1)//把x的向量加到y的向量上
        (y._1, x._2 + y._2)//返回y的向量和x+y的長度
      }//這個後面會用到


      val activeCenters = activeRuns.map(r => centers(r)).toArray//取出每個步驟的中心陣列
      val costAccums = activeRuns.map(_ => sc.accumulator(0.0))//每步弄一個累加器,用來累加代價函式


      val bcActiveCenters = sc.broadcast(activeCenters)//把中心廣播出去,這倆廣播和累加用的很經典


      // Find the sum and count of points mapping to each center
      val totalContribs = data.mapPartitions { points =>//分片算資料
        val thisActiveCenters = bcActiveCenters.value//獲取每步的聚類中心陣列
        val runs = thisActiveCenters.length//獲取步數
        val k = thisActiveCenters(0).length//獲取聚類中心的維度,也就是聚成幾個點
        val dims = thisActiveCenters(0)(0).vector.size//獲取聚類中心點的向量長度,也就是原始資料的屬性個數


        val sums = Array.fill(runs, k)(Vectors.zeros(dims))//搞一個二維陣列,行是執行步數,列是聚類中心個數,每個元素是原始資料維度的向量,也就是一條資料
        val counts = Array.fill(runs, k)(0L)//同理,再搞一哥二維陣列,存放每個類別的資料條數


        points.foreach { point =>//分片的每條資料
          (0 until runs).foreach { i =>//遍歷步數
            val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)//傳入每步的聚類中心和固定資料,計算最短距離,返回最好的中心和代價,我們先看findClosest()如何工作的

  private[mllib] def findClosest(//返回最近中心的索引和平方距離,注意引數都是帶2範數的
      centers: TraversableOnce[VectorWithNorm],
      point: VectorWithNorm): (Int, Double) = {
    var bestDistance = Double.PositiveInfinity//距離弄成正無窮
    var bestIndex = 0//索引弄成0
    var i = 0
    centers.foreach { center =>//每個中心點與給定點的距離
      // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
      // distance computation.//通過不等式簡化距離計算,有魄力
      var lowerBoundOfSqDist = center.norm - point.norm//直接2範數差平方替代距離
      lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
      if (lowerBoundOfSqDist < bestDistance) {//如果比最優距離小
        val distance: Double = fastSquaredDistance(center, point)//又調了一個,下面是具體的實現程式碼

  private[mllib] def fastSquaredDistance(//\|a - b\|_2^2 = \|a\|_2^2 + \|b\|_2^2 - 2 a^T b.用這個公式近似計算距離,我們捋下思路,先是範數計算近似距離,如果有更近的再進一步算,進一步算也是通過近似計算得到的,都是為了提高效能,確實吊
      v1: Vector,
      norm1: Double,
      v2: Vector,
      norm2: Double,
      precision: Double = 1e-6): Double = {//這有個引數是精度
    val n = v1.size//求向量長度
    require(v2.size == n)
    require(norm1 >= 0.0 && norm2 >= 0.0)
    val sumSquaredNorm = norm1 * norm1 + norm2 * norm2//範數平方和
    val normDiff = norm1 - norm2//範數相減,這都是根據公式
    var sqDist = 0.0//距離先設為0
    /*
     * The relative error is
     * <pre>
     * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),//根據公式弄了一個相對錯誤,EPSILON是一個趨於1但比1大的數
     * </pre>
     * which is bounded by
     * <pre>
     * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).//搞了一個邊界,我不仔細看了
     * </pre>
     * The bound doesn't need the inner product, so we can use it as a sufficient condition to//他說邊界不需要計算點積
     * check quickly whether the inner product approach is accurate.
     */
    val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)//按公式走,這是個邊界
    if (precisionBound1 < precision) {//如果邊界比精度小,再按開始的公式算帶點積的,這個太bug了
      sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
    } else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {///如果邊界大於等於精度
      val dotValue = dot(v1, v2)//老老實實算點積
      sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)//按開始的公式算,取與0中較大者
      val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) //算邊界二,其實就是一個上界一個下界
        (sqDist + EPSILON)//距離+常數
      if (precisionBound2 > precision) {//如果大於精度
        sqDist = Vectors.sqdist(v1, v2)//計算向量距離的平方,這裡面也分了很多情況,但沒什麼難度
      }
    } else {
      sqDist = Vectors.sqdist(v1, v2)
    }
    sqDist//最後返回距離的平方,想完全看懂近似計算的應該去看論文了
  }


        if (distance < bestDistance) {//回到findClosest,如果距離不是最優,賦值,記錄索引,這個都明白
          bestDistance = distance
          bestIndex = i
        }
      }
      i += 1//索引自加
    }
    (bestIndex, bestDistance)//返回最優索引,和最優距離
  }


            costAccums(i) += cost//累加距離
            val sum = sums(i)(bestCenter)//拿出最優索引的
            axpy(1.0, point.vector, sum)//把點放到sum,為了更新中心點
            counts(i)(bestCenter) += 1//最優距離對應的中心累加,留個權重
          }
        }


        val contribs = for (i <- 0 until runs; j <- 0 until k) yield {//每步每個中心點,看下貢獻,捋一下,為什麼迴圈runs應為每次拿資料點相當於同時做5個聚類,為什麼做5個聚類,因為初始化聚類中心會有5組,最後要選出5組最優的
          ((i, j), (sums(i)(j), counts(i)(j)))
        }
        contribs.iterator
      }.reduceByKey(mergeContribs).collectAsMap()//這裡呼叫mergeContribs,把步相同中心點相同的距離和數量都累加,最後弄成一個map


      bcActiveCenters.unpersist(blocking = false)//釋放快取,這個東西其實不大,主要是清理廣播變數


      // Update the cluster centers and costs for each active run
      for ((run, i) <- activeRuns.zipWithIndex) {
        var changed = false
        var j = 0
        while (j < k) {//k是要聚幾個類
          val (sum, count) = totalContribs((i, j))//取出對應的貢獻
          if (count != 0) {//如果有貢獻
            scal(1.0 / count, sum)//資料除以數量,這就是新的聚類中心
            val newCenter = new VectorWithNorm(sum)
            if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {//看新舊距離變化是否明顯,如果明顯置為true
              changed = true
            }
            centers(run)(j) = newCenter//舊的不去新的不來
          }
          j += 1
        }
        if (!changed) {//如果沒改變
          active(run) = false
          logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")//就說執行的第幾步運行了幾次結束了
        }
        costs(run) = costAccums(i).value//取出代價函式
      }


      activeRuns = activeRuns.filter(active(_))//迭代好的過濾
      iteration += 1//迭代次數自增
    }


    val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
    logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.")


    if (iteration == maxIterations) {
      logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
    } else {
      logInfo(s"KMeans converged in $iteration iterations.")
    }


    val (minCost, bestRun) = costs.zipWithIndex.min//取出5步裡面效果最好的,我沒騙你吧


    logInfo(s"The cost for the best run is $minCost.")


    new KMeansModel(centers(bestRun).map(_.vector))//把最好的聚類中心弄成向量返回
  }