1. 程式人生 > >大資料:Spark mlib(一) KMeans聚類演算法原始碼分析

大資料:Spark mlib(一) KMeans聚類演算法原始碼分析

1. 聚類

1.1 什麼是聚類?

所謂聚類問題,就是給定一個元素集合D,其中每個元素具有n個可觀察屬性,使用演算法將集合D劃分成k個子集,要求每個子集內部的元素之間相異度儘可能低,而不同子集的元素相異度儘可能高,其中每個子集叫做一個簇。

1.2 KMeans 聚類演算法

K-Means聚類演算法主要分為如下幾個步驟:
  1. 從D中隨機取k個元素,作為k個簇的各自的中心
  2. 分別計算剩下的元素到k個簇中心的相異度,將這些元素分別劃歸到相異度最低的簇
  3. 根據聚類結果,重新計算k個簇各自的中心,計算方法是取簇中所有元素各自維度的算術平均數
  4. 將D中全部元素按照新的中心重新聚類。 
  5. 重複第4步,直到聚類結果不再變化。 

1.2.1 什麼是相異度

設 X={x1,x2.....,xn},Y={y1,y2......yn}其中X,Y是兩個元素項,各自具有n個可度量特徵屬性X和Y的相異度定義為: d(X,Y)=f(X,Y)->R,其中R為實數域,也就是兩個元素的相異度。

1.2.2 相異度的演算法

因為每個緯度的數字都是無方向意義的標度變數,可以通過距離來標示相異度常見的幾個距離計算公式:歐幾里得距離: 曼哈頓距離:閔可夫斯基距離:

1.2.3 資料的規格化

在計算距離的時候,會發現取值範圍大的屬性對距離的影響高於取值範圍小的屬性,為了解決這個問題,一般要對屬性值進行規格化。規格化就是將各個屬性值按比例對映到相同的取值區間,這樣是為了平衡各個屬性對距離的影響。最典型的規格化就是資料的歸一化:
將各個屬性均對映到[0,1]區間對映公式為:其中max(ai)和min(ai)表示所有元素項中第i個屬性的最大值和最小值

2. Spark Kmeans的實現

2.1 Kmeans 初始化的幾個引數

class KMeans private (
    private var k: Int,
    private var maxIterations: Int,
    private var initializationMode: String,
    private var initializationSteps: Int,
    private var epsilon: Double,
    private var seed: Long) extends Serializable 

引數定義
K聚的總類
maxIterations迭代的次數
initializationMode有 random 和 k-means||兩種
initializationSteps初始化的步長
epsilon最小中心距離的筏值
seed隨機數的種子

2.2 步驟1:Kmeans 的初始化中心的選擇

Kmeans 在資料集初始化的時候中選K箇中心點有兩種演算法
  • 隨機選擇:依據給的種子seed,隨機生成K個隨機中心點
  • k-means||:預設的演算法
  1.  隨機生成一箇中心點,基於這個中心點,找出一批距離這個中心點較遠的點作為集合(分散式查詢)
  2.  以這些找到的點的集合為新的中心點,依據initializationSteps作為重複查詢步驟1,2的次數(分散式查詢)
  3.  如果找到的這些點的數量小於k,那麼就以這些點為中心點
  4.  不如2步驟找到的這些點大於k,那麼將基於這些點作為樣本進行k-means++的中心點查詢,找到K箇中心點。k-means++的查詢是在有限的點上查詢(driver端的本地權重查詢)
if (initializationMode == KMeans.RANDOM) {
          initRandom(data)
        } else {
          initKMeansParallel(data)
        }

2.3  步驟2: 計算每個點的特徵向量的norm

// Compute squared norms and cache them.
    val norms = data.map(Vectors.norm(_, 2.0))
    norms.persist()

我們來看一下norm的演算法
else if (p == 2) {
      var sum = 0.0
      var i = 0
      while (i < size) {
        sum += values(i) * values(i)
        i += 1
      }
      math.sqrt(sum)
    

假如:一個點的A(a1,b1) 那麼norm的計算就是 sqrt(a1^2+b1^2),這也是向量的L2範數

2.4 步驟3:計算每個點距離其他點的距離

在Spark使用的距離演算法是歐式距離演算法,我們先來看這個距離演算法:對兩個點 x(x1,x2....xn)和y(y1,y2....yn)
將方程式解開sqrt(x1^2+x2^2+x3^2+.....+xn^2 + y1^2+y2^2+...+yn^2 -2(x1y1+x2*y2.....+xn*yn))x1^2+x2^2+x3^2+.....+xn^2 這部分可以提前算,但是-2(x1y1+x2*y2.....+xn*yn))這部分的計算是需要時時計算的

2.4.1 解開歐式距離計算需要考慮精度

Spark中精度的計算公式:
 val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
    val normDiff = norm1 - norm2
 val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
if (precisionBound1 < precision) {
      sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
    }
如果在精度(precision: Double = 1e-6)滿足條件的情況下,歐式距離sqDist = sumSquaredNorm - 2.0 * v1.dot(v2),sumSquaredNorm即為,2.0 * v1.dot(v2)即為如果精度不滿足要求,則進行原始的距離計算公式了即呼叫Vectors.sqdist(v1, v2)。

2.4.2  快速演算法lowerBoundOfSqDist

在這種情況下,Spark實現了一個快速演算法我們以兩個緯度作為例子,假如兩個點  x(a1,b1)  y(a2,b2)演算法lowerBoundOfSqDist
我們分別展開歐式距離和這種距離演算法
可以簡單的證明演算法lowerBoundOfSqDist小於歐式距離,
  • lowerBoundOfSqDist大於bestdistance,那麼可以推導歐式距離也大於bestdistance,不需要計算歐式距離
  • lowerBoundOfSqDist小於bestdistance,需要繼續計算歐式距離來保證正確性
lowerBoundOfSqDist演算法的優勢比較明顯, sqrt(a1^2+a2^2) 就是前面計算的每個點的norm值
lowerBoundOfSqDist=(norm1-norm2)*(norm1-norm2)

private[mllib] def findClosest(
      centers: TraversableOnce[VectorWithNorm],
      point: VectorWithNorm): (Int, Double) = {
    var bestDistance = Double.PositiveInfinity
    var bestIndex = 0
    var i = 0
    centers.foreach { center =>
      // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary
      // distance computation.
      var lowerBoundOfSqDist = center.norm - point.norm
      lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
      if (lowerBoundOfSqDist < bestDistance) {
        val distance: Double = fastSquaredDistance(center, point)
        if (distance < bestDistance) {
          bestDistance = distance
          bestIndex = i
        }
      }
      i += 1
    }
    (bestIndex, bestDistance)
  }

2.4.3 加權歐式距離和lowerBoundOfSqDist

在有些應用場景可能會存在加權的情況,加權歐式距離:
w1,w2....wp 就是每個屬性的權重同樣的lowerBoundOfSqDist演算法也需要加權:(sqrt(W1Xi1^2+W2Xi2^2+....+WpXip^2)-sqrt(W1Xj1^2+W2Xj2^2+....+WpXjp^2))^2同樣也能證明加權的lowerBoundOfSqDist也是小於加權歐式距離

2.5 步驟4: 在聚過的簇中重新定義中心點

在已經聚過的簇中,使用所有點的平均值作為新的聚類中心
  totalContribs.foreach { case (j, (sum, count)) =>
        scal(1.0 / count, sum)
        val newCenter = new VectorWithNorm(sum)
        if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
          converged = false
        }
        centers(j) = newCenter
      }

重複步驟1-步驟4,直到迭代次數達到maxIterations初始化的引數為止注意:在我們前面的文章中,Spark做了一些演算法的優化而這些優化是基於歐式距離的,Spark mllib裡提供的Kmeans演算法不支援其它的距離演算法。

3. Kmeans的訓練模型

Kmeans本身也提供了訓練模型,模型的目的為了對新輸入的向量進行判定到哪個類別,聚類的模型最終的目的是為了分類。
@Since("0.8.0")
class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vector])
  extends Saveable with Serializable with PMMLExportable {

  /**
   * A Java-friendly constructor that takes an Iterable of Vectors.
   */
  @Since("1.4.0")
  def this(centers: java.lang.Iterable[Vector]) = this(centers.asScala.toArray)

  /**
   * Total number of clusters.
   */
  @Since("0.8.0")
  def k: Int = clusterCenters.length

  /**
   * Returns the cluster index that a given point belongs to.
   */
  @Since("0.8.0")
  def predict(point: Vector): Int = {
    KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1
  }

  /**
   * Maps given points to their cluster indices.
   */
  @Since("1.0.0")
  def predict(points: RDD[Vector]): RDD[Int] = {
    val centersWithNorm = clusterCentersWithNorm
    val bcCentersWithNorm = points.context.broadcast(centersWithNorm)
    points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1)
  }

  /**
   * Maps given points to their cluster indices.
   */
  @Since("1.0.0")
  def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] =
    predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]]

  /**
   * Return the K-means cost (sum of squared distances of points to their nearest center) for this
   * model on the given data.
   */
  @Since("0.8.0")
  def computeCost(data: RDD[Vector]): Double = {
    val centersWithNorm = clusterCentersWithNorm
    val bcCentersWithNorm = data.context.broadcast(centersWithNorm)
    data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
  }

  private def clusterCentersWithNorm: Iterable[VectorWithNorm] =
    clusterCenters.map(new VectorWithNorm(_))

  @Since("1.4.0")
  override def save(sc: SparkContext, path: String): Unit = {
    KMeansModel.SaveLoadV1_0.save(sc, this, path)
  }

  override protected def formatVersion: String = "1.0"
}

通過KMeansModel的訓練模型,predict輸入的向量所距離最近的中心點
  @Since("0.8.0")
  def predict(point: Vector): Int = {
    KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1
  }
看了熟悉的函式findClosest,那些中心點是在聚類結束建立中心點
new KMeansModel(centers.map(_.vector))

4. Spark Kmeans的評估

如何評估KMeans的聚類K的效果?可以通過computeCost函式來計算cost
  @Since("0.8.0")
  def computeCost(data: RDD[Vector]): Double = {
    val centersWithNorm = clusterCentersWithNorm
    val bcCentersWithNorm = data.context.broadcast(centersWithNorm)
    data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
  }
函式的演算法:通過計算所有資料點到其最近的中心點的距離平方和 (a1-c1)^2+(a2-c2)^2 +...... 使用不同的K,相同的迭代次數,理論上值越小,聚類效果越好,但是這是需要可解釋性,如果聚類K等於總資料點,當然聚類效果最好,cost是0,但沒有意義。

相關推薦

資料Spark mlib() KMeans演算法原始碼分析

1. 聚類1.1 什麼是聚類?所謂聚類問題,就是給定一個元素集合D,其中每個元素具有n個可觀察屬性,使用演算法將集合D劃分成k個子集,要求每個子集內部的元素之間相異度儘可能低,而不同子集的元素相異度儘可能高,其中每個子集叫做一個簇。1.2 KMeans 聚類演算法K-Mean

資料Spark mlib(三) GradientDescent梯度下降演算法Spark實現

1. 什麼是梯度下降?梯度下降法(英語:Gradient descent)是一個一階最優化演算法,通常也稱為最速下降法。 要使用梯度下降法找到一個函式的區域性極小值,必須向函式上當前點對應梯度(或者是近似梯度)的反方向的規定步長距離點進行迭代搜尋。先來看兩個函式:1.  擬合

Spark MLlib中KMeans演算法的解析和應用

聚類演算法是機器學習中的一種無監督學習演算法,它在資料科學領域應用場景很廣泛,比如基於使用者購買行為、興趣等來構建推薦系統。 核心思想可以理解為,在給定的資料集中(資料集中的每個元素有可被觀察的n個屬性),使用聚類演算法將資料集劃分為k個子集,並且要求每個子集內部的元素之間的差異度儘可能低,而不同子集元素的差

資料Spark)--- Spark簡介,模組,安裝,使用,一句話實現WorldCount,API,scala程式設計,提交作業到spark叢集,指令碼分析

一、Spark簡介 ---------------------------------------------------------- 1.快如閃電的叢集計算 2.大規模快速通用的計算引擎 3.速度: 比hadoop 100x,磁碟計算快10x 4.使用: java

資料spark叢集搭建

建立spark使用者組,組ID1000 groupadd -g 1000 spark 在spark使用者組下建立使用者ID 2000的spark使用者 獲取視訊中文件資料及完整視訊的夥伴請加QQ群:947967114useradd -u 2000 -g spark spark 設定密碼 passwd

資料Spark Core(二)Driver上的Task的生成、分配、排程

1. 什麼是Task?在前面的章節裡描述過幾個角色,Driver(Client),Master,Worker(Executor),Driver會提交Application到Master進行Worker上的Executor上的排程,顯然這些都不是Task.Spark上的幾個關係

資料Spark Standalone 叢集排程(二)如何建立、分配Executors的資源

Standalone 的整體架構 在Spark叢集中的3個角色Client, Master, Worker, 下面的圖是Client Submit 一個任務的流程圖: 完整的流程:Driver 提交任務給Master, 由Master節點根據任務的引數對進行Worker

資料Spark Storage(二) 叢集下的broadcast

Spark BroadCast Broadcast 簡單來說就是將資料從一個節點複製到其他各個節點,常見用於資料複製到節點本地用於計算,在前面一章中討論過Storage模組中BlockManager,Block既可以儲存在記憶體中,也可以儲存在磁碟中,當Executor節點

基礎演算法(二)Kmeans演算法的基本原理與應用

Kmeans聚類演算法的基本原理與應用       內容說明:主要介紹Kmeans聚類演算法的數學原理,並使用matlab程式設計實現Kmeans的簡單應用,不對之處還望指正。 一、Km

Kmeans演算法在python下的實現--附測試資料

Kmeans演算法 1:隨機初始化一個聚類中心 2:根據距離將資料點劃分到不同的類中 3:計算代價函式 4:重新計算各類資料的中心作為聚類中心 5:重複2-4步直到代價函式不發生變化 測試資料: XY -1.260.46 -1.150.49 -1.190.36 -1.330

Hadoop/MapReduce 及 Spark KMeans演算法實現

package kmeans; import java.io.BufferedReader; import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.

KMeans演算法分析以及實現

KMeans KMeans是一種無監督學習聚類方法, 目的是發現數據中資料物件之間的關係,將資料進行分組,組內的相似性越大,組間的差別越大,則聚類效果越好。 無監督學習,也就是沒有對應的標籤,只有資料記錄.通過KMeans聚類,可以將資料劃分成一個簇,進而發現數據之間的關係.

Python商品資料預處理與K-Means視覺化分析

資料提取 在我之前的文章Scrapy自動爬取商品資料爬蟲裡實現了爬蟲爬取商品網站搜尋關鍵詞為python的書籍商品,爬取到了60多頁網頁的1260本python書籍商品的書名,價格,評論數和商品連結,並將所有商品資料儲存到本地的.json檔案中。資料儲存格式如下:

從零開始實現Kmeans演算法

本系列文章的所有原始碼都將會開源,需要原始碼的小夥伴可以去我的 Github fork! 1. Kmeans聚類演算法簡介 由於具有出色的速度和良好的可擴充套件性,Kmeans聚類演算法算得上是最著名的聚類方法。Kmeans演算法是一個重複移動類中心

影象基本變換---KMeans演算法

本文將詳細介紹K-Means均值聚類的演算法及實現。    聚類是一個將資料集中在某些方面相似的資料成員進行分類組織的過程。K均值聚類是最著名的劃分聚類演算法,由於簡潔和效率使得他成為所有聚類演算法中最廣泛使用的。給定一個數據點集合和需要的聚類數目k,k由使用者指定,k均值

Scala語言實現Kmeans演算法

/** * @author weixu_000 */ import java.util.Random import scala.io.Source import java.io._ object Kmeans { val k = 5 val dim = 41

kmeans演算法及複雜度

kmeans是最簡單的聚類演算法之一,kmeans一般在資料分析前期使用,選取適當的k,將資料分類後,然後分類研究不同聚類下資料的特點。 演算法原理 隨機選取k箇中心點; 遍歷所有資料,將每個資料劃分到最近的中心點中; 計算每個聚類的平均值,並作為新的中心點; 重複

Kmeans演算法及其matlab原始碼

本文介紹了K-means聚類演算法,並註釋了部分matlab實現的原始碼。K-means演算法K-means演算法是一種硬聚類演算法,根據資料到聚類中心的某種距離來作為判別該資料所屬類別。K-means演算法以距離作為相似度測度。假設將物件資料集分為個不同的類,k均值聚類演算

K-means演算法原理分析與實際應用案例分析(案例分析另起篇部落格)

引言 在資料分析中,我們常常想將看上去相似或者行為形似的資料聚合在一起。例如,對一個營銷組織來說,將不同客戶根據他們的特點進行分組,從而有針對性地定製營銷活動,這很重要。又比如,對學校老師來說,將學生分組同樣能夠有所側重的進行教育活動。分類與聚類是資料探勘領域

KMeans演算法

1、什麼是聚類     所謂聚類就是將一組物件按照特徵劃分不為的小組,使得組內的差異性儘可能的小,組間的差異儘可能的大。例如,粗粒度的分類,按照學校實力,分為985、211高校,普通一本高校,二本高校,三本高校。如果再更加細的分類,一個學校裡面會按照所修的課程差異性分為不同