sparkML之kmeans聚類
1.標準kmeans演算法
kmeans演算法是實際中最常用的聚類演算法,沒有之一。kmeans演算法的原理簡單,實現起來不是很複雜,實際中使用的效果一般也不錯,所以深受廣大人民群眾的喜愛。
kmeans演算法的原理介紹方面的paper多如牛毛,而且理解起來確實也不是很複雜,這裡使用wiki上的版本:
已知觀測集(x1,x2,⋯,xn),其中每個觀測都是一個d維實向量,kmeans聚類要把這n個觀測值劃分到k個集合中(k≤n),使得組內平方和(WCSS
within-cluster sum of squares)最小。換句話說,它的目標是找到使得下式滿足的聚類Si,
其中μi是Si中所有點的均值。
標準kmeans演算法的步驟一般如下:
1.先隨機挑選k個初始聚類中心。
2.計算資料集中每個點到每個聚類中心的距離,然後將這個點分配到離該點最近的聚類中心。
3.重新計算每個類中所有點的座標的平均值,並將得到的這個新的點作為新的聚類中心。
重複上面第2、3步,知道聚類中心點不再大範圍移動(精度自己定義)或者迭代的總次數達到最大。
2.標準kmeans演算法的優缺點
標準的kmeans演算法的優缺點都很突出。這裡挑幾個最重要的點總結一下。
主要優點:
1.原理簡單,易於理解。
2.實現簡單
3.計算速度較快
4.聚類效果還不錯。
主要缺點:
1.需要確定k值。
2.對初始中心點的選擇敏感。
3.對異常值敏感,因為異常值很很大程度影響聚類中心的位置。
4.無法增量計算。這點在資料量大的時候尤為突出。
3.spark中對kmeans的優化
作為經典的聚類演算法,一般的機器學習框架裡都實現由kmeans,spark自然也不例外。前面我們已經講了標準kmeans的流程以及優缺點,那麼針對標準kmeans中的不足,spark裡主要做了如下的優化:
1.選擇合適的K值。
k的選擇是kmeans演算法的關鍵。Spark MLlib在KMeansModel裡實現了computeCost方法,這個方法通過計算資料集中所有的點到最近中心點的平方和來衡量聚類的效果。一般來說,同樣的迭代次數,這個cost值越小,說明聚類的效果越好。但在實際使用過程中,必須還要考慮聚類結果的可解釋性,不能一味地選擇cost值最小的那個k。比如我們如果考慮極限情況,如果資料集有n個點,如果令k=n,每個點都是聚類中心,每個類都只有一個點,此時cost值最小為0。但是這樣的聚類結果顯然是沒有實際意義的。
2.選擇合適的初始中心點
大部分迭代演算法都對初始值很敏感,kmeans也是如此。spark MLlib在初始中心點的選擇上,使用了k-means++的演算法。想要詳細瞭解k-means++的同學們,可以參考k-means++在wiki上的介紹:https://en.wikipedia.org/wiki/K-means%2B%2B。
kmeans++的基本思想是是初始中心店的相互距離儘可能遠。為了實現這個初衷,採取如下步驟:
1.從初始資料集中隨機選擇一個點作為第一個聚類中心點。
2.計算資料集中所有點到最近一箇中心點的距離D(x)並存在一個數組裡,然後將所有這些距離加起來得到Sum(D(x))。
3.然後再取一個隨機值,用權重的方式計算下一個中心點。具體的實現方法:先取一個在Sum(D(x))範圍內的隨機值,然後領Random -= D(x),直至Random <= 0,此時這個D(x)對應的點為下一個中心點。
4.重複2、3步直到k個聚類中心點被找出。
5.利用找出的k個聚類中心點,執行標準的kmeans演算法。
演算法的關鍵是在第三步。有兩個小點需要說明:
1.不能直接取距離最大的那個點當中心店。因為這個點很可能是離群點。
2.這種取隨機值的方法能保證距離最大的那個點被選中的概率最大。給大家舉個很簡單的例子:假設有四個點A、B、C、D,分別離最近中心的距離D(x)為1、2、3、4,那麼Sum(D(x))=10。然後在[0,10]之間取一隨機數假設為random,然後用random與D(x)依次相減,直至random<0為止。應該不難發現,D被選中的概率最大。
以上參考地址:http://blog.csdn.net/bitcarmanlee/article/details/52092288
4.spark實戰kmeans
單機版
import org.apache.spark.SparkContext
import java.io.PrintWriter
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import scala.io.Source
object Kmeans {
def main(args: Array[String]): Unit = {
//本地執行不報hadoop錯誤
System.setProperty("hadoop.home.dir", "E:\\data\\winutils")
val conf = new SparkConf().setAppName("kmeans").setMaster("local")
val sc = new SparkContext(conf)
//rfm
var input = "E:\\資料處理分析\\使用者RFM模型\\kmean_input_20171114_request.csv";
var output = "E:\\資料處理分析\\使用者RFM模型\\kmean_output_20171114_request.csv"
//emui探索分類
input = "E:\\資料處理分析\\emui興趣探索\\kmeans_input.csv"
output = "E:\\資料處理分析\\emui興趣探索\\kmeans_output.csv"
val data = sc.textFile(input)
//val parsedData = data.map(s => Vectors.dense(s.split(',').map(_.trim.toDouble)))
val parsedData = data.map(s => Vectors.dense(s.split(',').map(x => (x.replaceAll("nan", "0"))).map(_.trim.toDouble)))
//如何選擇K
/* val ks = Array(10,11,12,13,14,15,16,17,18,19,
20,21,22,23,24,25,26,27,28,29,
30,31,32,33,34,35,36,37,38,39,
40,41,42,43,44,45,46,47,48,49,
50,51,52,53,54,55,56,57,58,59);
ks.foreach(k=>{
val model = KMeans.train(parsedData, k, 20)
val sse = model.computeCost(parsedData)
println("sum of squared distances of points to their nearest center when k=" + k + " -> "+ sse)
})*/
//設定簇的個數為3
val numClusters = 50
//迭代20次
val numIterations = 20
//設定初始K選取方式為k-means++
val initMode = "k-means||"
val clusters = new KMeans().
setInitializationMode(initMode).
setK(numClusters).
setMaxIterations(numIterations).
run(parsedData)
//打印出測試資料屬於哪個簇
//println(parsedData.map(v => v.toString() + " belong to cluster :" + clusters.predict(v)).collect().mkString("\n"))
val writer = new PrintWriter(new File(output))
writer.println(parsedData.map(v => v.toString() + "," + clusters.predict(v)).collect().mkString("\n"))
writer.close()
// Evaluateclustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("WithinSet Sum of Squared Errors = " + WSSSE)
//打印出中心點
println("Clustercenters:")
for (center <- clusters.clusterCenters) {
println(" " + center)
}
}
}
叢集版:主要是為了找出最合適的K值,單機執行還是太慢了,根據sse值選擇合適的K(凸點)
import org.apache.spark.SparkContext
import java.io.PrintWriter
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Kmeans {
def main(args: Array[String]): Unit = {
//System.setProperty("hadoop.home.dir", "E:\\data\\winutils")
val conf = new SparkConf().setAppName("kmeans")
val sc = new SparkContext(conf)
val data = sc.textFile(args.apply(0))
//由於資料量小 分10個分割槽,這樣才能多個任務並行
val parsedData = data.map(s => Vectors.dense(s.split(',').map(x => (x.replaceAll("nan", "0"))).map(_.trim.toDouble))).repartition(10)
//如何選擇K
val ks = Array(10,11,12,13,14,15,16,17,18,19,
20,21,22,23,24,25,26,27,28,29,
30,31,32,33,34,35,36,37,38,39,
40,41,42,43,44,45,46,47,48,49,
50,51,52,53,54,55,56,57,58,59);
ks.foreach(k=>{
val model = KMeans.train(parsedData, k, 20)
val sse = model.computeCost(parsedData)
println("sum of squared distances of points to their nearest center when k=" + k + " -> "+ sse)
})
}
}
執行指令碼#!/bin/bash
function start_spark()
{
spark-submit --master yarn-cluster \
--name kmeans \
--executor-memory 2G \
--executor-cores 1 \
--num-executors 10 \
--driver-memory 1G \
--conf "spark.reducer.maxSizeInFlight=24m" \
--conf "spark.shuffle.copier.threads=5" \
--conf "spark.storage.memoryFraction=0.5" \
--conf "spark.driver.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Xmn800M -Xms1024M -Xmx1024M -XX:MaxPermSize=128M -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf "spark.executor.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Xss3M" \
--class com.inveno.ml.Kmeans offline-ad-material-spark-0.0.1.jar \
/jiangsf/kmeans/kmeans_input.csv
return 0
}
start_spark