Spark MLlib 之 大規模數據集的相似度計算原理探索
無論是ICF基於物品的協同過濾、UCF基於用戶的協同過濾、基於內容的推薦,最基本的環節都是計算相似度。如果樣本特征維度很高或者<user, item, score>的維度很大,都會導致無法直接計算。設想一下100w*100w的二維矩陣,計算相似度怎麽算?
更多內容參考——我的大數據學習之路——xingoo
在spark中RowMatrix提供了一種並行計算相似度的思路,下面就來看看其中的奧妙吧!
相似度
相似度有很多種,每一種適合的場景都不太一樣。比如:
- 歐氏距離,在幾何中最簡單的計算方法
- 夾角余弦,通過方向計算相似度,通常在用戶對商品評分、NLP等場景使用
- 傑卡德距離,在不考慮每一樣的具體值時使用
- 皮爾森系數,與夾角余弦類似,但是可以去中心化。比如評分時,有人傾向於打高分,有人傾向於打低分,他們的最後效果在皮爾森中是一樣的
- 曼哈頓距離,一般在路徑規劃、地圖類中常用,比如A*算法中使用曼哈頓來作為每一步代價值的一部分(F=G+H, G是從當前點移動到下一個點的距離,H是距離目標點的距離,這個H就可以用曼哈頓距離表示)
在Spark中使用的是夾角余弦,為什麽選這個,道理就在下面!
上面兩個向量
\[
\left( { x }_{ 1 },{ y }_{ 1 } \right)
\]
和
\[
\left( { x }_{ 2 },{ y }_{ 2 } \right)
\]
計算其夾角的余弦值就是兩個向量方向的相似度。
公式為:
\[
cos(\theta )=\frac { a\cdot b }{ ||a||\ast ||b|| } \\ =\quad \frac { { x }_{ 1 }\ast { x }_{ 2 }\quad +\quad { y }_{ 1 }\ast y_{ 2 } }{ \sqrt { { x }_{ 1 }^{ 2 }+{ x }_{ 2 }^{ 2 } } \ast \sqrt { { y }_{ 1 }^{ 2 }+{ y }_{ 2 }^{ 2 } } }
\]
其中,\(||a||\)表示a的模,即每一項的平方和再開方。
公式拆解
那麽如果向量不只是兩維,而是n維呢?比如有兩個向量:
他們的相似度計算方法套用上面的公式為:
\[ cos(\theta )\quad =\quad \frac { \sum _{ i=1 }^{ n }{ ({ x }_{ i }\ast { y }_{ i }) } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 }\ast { y }_{ 1 }+{ x }_{ 2 }\ast { y }_{ 2 }+...+{ x }_{ n }\ast { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 }\ast { y }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +\frac { { x }_{ 2 }\ast { y }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +...+\frac { { x }_{ n }\ast { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } \ast \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \\ =\quad \frac { { x }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ 1 } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +\frac { { x }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ 2 } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } +...+\frac { { x }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } } } \ast \frac { { y }_{ n } }{ \sqrt { \sum _{ i=1 }^{ n }{ y_{ i }^{ 2 } } } } \]
通過上面的公式就可以發現,夾角余弦可以拆解成每一項與另一項對應位置的乘積\({ x }_{ 1 }\ast { y }_{ 1 }\),再除以每個向量自己的
\[
\sqrt { \sum _{ i=1 }^{ n }{ { x }_{ i }^{ 2 } } }
\]
就可以了。
矩陣並行
畫個圖看看,首先創建下面的矩陣:
註意,矩陣裏面都是一列代表一個向量....上面是創建矩陣時的三元組,如果在spark中想要創建matrix,可以這樣:
val df = spark.createDataFrame(Seq(
(0, 0, 1.0),
(1, 0, 1.0),
(2, 0, 1.0),
(3, 0, 1.0),
(0, 1, 2.0),
(1, 1, 2.0),
(2, 1, 1.0),
(3, 1, 1.0),
(0, 2, 3.0),
(1, 2, 3.0),
(2, 2, 3.0),
(0, 3, 1.0),
(1, 3, 1.0),
(3, 3, 4.0)
))
val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD)
然後計算每一個向量的normL2,即平方和開根號。
以第一個和第二個向量計算為例,第一個向量為(1,1,1,1),第二個向量為(2,2,1,1),每一項除以對應的normL2,得到後面的兩個向量:
\[
0.5*0.63+0.5*0.63+0.5*0.31+0.5*0.31 \approx 0.94
\]
兩個向量最終的相似度為0.94。
那麽在Spark如何快速並行處理呢?通過上面的例子,可以看到兩個向量的相似度,需要把每一維度乘積後相加,但是一個向量一般都是跨RDD保存的,所以可以先計算所有向量的第一維,得出結果
\[
(向量1的第1維,向量2的第1維,value)\(向量1的第2維,向量2的第2維,value)\...\(向量1的第n維,向量2的第n維,value)\(向量1的第1維,向量3的第1維,value)\..\(向量1的第n維,向量3的第n維,value)\\]
最後對做一次reduceByKey累加結果即可.....
閱讀源碼
首先創建dataframe形成matrix:
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.sql.SparkSession
object MatrixSimTest {
def main(args: Array[String]): Unit = {
// 創建dataframe,轉換成matrix
val spark = SparkSession.builder().master("local[*]").appName("sim").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val df = spark.createDataFrame(Seq(
(0, 0, 1.0),
(1, 0, 1.0),
(2, 0, 1.0),
(3, 0, 1.0),
(0, 1, 2.0),
(1, 1, 2.0),
(2, 1, 1.0),
(3, 1, 1.0),
(0, 2, 3.0),
(1, 2, 3.0),
(2, 2, 3.0),
(0, 3, 1.0),
(1, 3, 1.0),
(3, 3, 4.0)
))
val matrix = new CoordinateMatrix(df.map(row => MatrixEntry(row.getAs[Integer](0).toLong, row.getAs[Integer](1).toLong, row.getAs[Double](2))).toJavaRDD)
// 調用sim方法
val x = matrix.toRowMatrix().columnSimilarities()
// 得到相似度結果
x.entries.collect().foreach(println)
}
}
得到的結果為:
MatrixEntry(0,3,0.7071067811865476)
MatrixEntry(0,2,0.8660254037844386)
MatrixEntry(2,3,0.2721655269759087)
MatrixEntry(0,1,0.9486832980505139)
MatrixEntry(1,2,0.9128709291752768)
MatrixEntry(1,3,0.596284793999944)
直接進入columnSimilarities方法看看是怎麽個流程吧!
def columnSimilarities(): CoordinateMatrix = {
columnSimilarities(0.0)
}
內部調用了帶閾值的相似度方法,這裏的閾值是指相似度小於該值時,輸出結果時,會自動過濾掉。
def columnSimilarities(threshold: Double): CoordinateMatrix = {
//檢查參數...
val gamma = if (threshold < 1e-6) {
Double.PositiveInfinity
} else {
10 * math.log(numCols()) / threshold
}
columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
}
這裏的gamma用於采樣,具體的做法咱們來繼續看源碼。然後看一下computeColumnSummaryStatistics().normL2.toArray
這個方法:
def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
updateNumRows(summary.count)
summary
}
之前有介紹這個treeAggregate是一種帶“預reduce”的map-reduce,返回的summary,裏面幫我們統計了每一個向量的很多指標,比如
currMean 為 每一個向量的平均值
currM2 為 每個向量的每一維的平方和
currL1 為 每個向量的絕對值的和
currMax 為 每個向量的最大值
currMin 為 每個向量的最小值
nnz 為 每個向量的非0個數
這裏我們只需要currM2,它是每個向量的平方和。summary調用的normL2方法:
override def normL2: Vector = {
require(totalWeightSum > 0, s"Nothing has been added to this summarizer.")
val realMagnitude = Array.ofDim[Double](n)
var i = 0
val len = currM2.length
while (i < len) {
realMagnitude(i) = math.sqrt(currM2(i))
i += 1
}
Vectors.dense(realMagnitude)
}
上面這步就是對平方和開個根號,這樣就求出來了每個向量的分母部分。
下面就是最關鍵的地方了:
private[mllib] def columnSimilaritiesDIMSUM(
colMags: Array[Double],
gamma: Double): CoordinateMatrix = {
// 一些參數校驗
// 對gamma進行開方
val sg = math.sqrt(gamma) // sqrt(gamma) used many times
// 這裏把前面算的平方根的值設置一個默認值,因為如果為0,除0會報異常,所以設置為1
val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x)
// 把抽樣概率數組 和 平方根數組進行廣播
val sc = rows.context
val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c))
val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c)))
// 遍歷每一行,計算每個向量該維的乘積,形成三元組
val sims = rows.mapPartitionsWithIndex { (indx, iter) =>
val p = pBV.value
val q = qBV.value
// 獲得隨機值
val rand = new XORShiftRandom(indx)
val scaled = new Array[Double](p.size)
iter.flatMap { row =>
row match {
case SparseVector(size, indices, values) =>
// 如果是稀疏向量,遍歷向量的每一維,除以平方根
val nnz = indices.size
var k = 0
while (k < nnz) {
scaled(k) = values(k) / q(indices(k))
k += 1
}
// 遍歷向量數組,計算每一個數值與其他數值的乘機。
// 比如向量(1, 2, 0 ,1)
// 得到的結果為 (0,1,value)(0,3,value)(2,3,value)
Iterator.tabulate (nnz) { k =>
val buf = new ListBuffer[((Int, Int), Double)]()
val i = indices(k)
val iVal = scaled(k)
// 判斷當前列是否符合采樣範圍,如果小於采樣值,就忽略
if (iVal != 0 && rand.nextDouble() < p(i)) {
var l = k + 1
while (l < nnz) {
val j = indices(l)
val jVal = scaled(l)
if (jVal != 0 && rand.nextDouble() < p(j)) {
// 計算每一維與其他維的值
buf += (((i, j), iVal * jVal))
}
l += 1
}
}
buf
}.flatten
case DenseVector(values) =>
// 跟稀疏同理
val n = values.size
var i = 0
while (i < n) {
scaled(i) = values(i) / q(i)
i += 1
}
Iterator.tabulate (n) { i =>
val buf = new ListBuffer[((Int, Int), Double)]()
val iVal = scaled(i)
if (iVal != 0 && rand.nextDouble() < p(i)) {
var j = i + 1
while (j < n) {
val jVal = scaled(j)
if (jVal != 0 && rand.nextDouble() < p(j)) {
buf += (((i, j), iVal * jVal))
}
j += 1
}
}
buf
}.flatten
}
}
// 最後再執行一個reduceBykey,累加所有的值,就是i和j的相似度
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
MatrixEntry(i.toLong, j.toLong, sim)
}
new CoordinateMatrix(sims, numCols(), numCols())
}
這樣把所有向量的平方和廣播後,每一行都可以在不同的節點並行處理了。
總結來說,Spark提供的這個計算相似度的方法有兩點優勢:
- 通過拆解公式,使得每一行獨立計算,加快速度
- 提供采樣方案,以采樣方式抽樣固定的特征維度計算相似度
不過傑卡德目前並不能使用這種方法來計算,因為傑卡德中間有一項需要對向量求dot,這種方式就不適合了;如果傑卡德想要快速計算,可以去參考LSH局部敏感哈希算法,這裏就不詳細說明了。
Spark MLlib 之 大規模數據集的相似度計算原理探索