1. 程式人生 > >spark基於使用者的協同過濾演算法與坑點,提交job

spark基於使用者的協同過濾演算法與坑點,提交job

承接上文:
http://blog.csdn.net/wangqi880/article/details/52875524
對了,每臺機子的防火牆要關閉哈,不然spark叢集啟動不起來
前一次,已經把spark的分散式叢集佈置好了,今天寫一個簡單的案例來執行。會寫一些關於spark的推薦的東西,這裡主要有4點,1基於使用者協同過濾,2基於物品協同過濾,3基於模型的協同過濾,4基於關聯規則的推薦(fp_growth),只寫核心程式碼啊。

基於spark使用者協同過濾演算法的實現

1使用者協同過濾演算法

1.1含義

它是統計計算搜尋目標使用者的相似使用者,並根據相似使用者對物品的打分來預測目標使用者對指定物品的評分,一般選擇topn選擇相似度較高的相似使用者做推薦結果。
從這句話,我們可以看出UserBase推薦演算法主要有3個工作要做:1使用者相似度量,2最近鄰居查詢,3預測評分。
具體百度查

1.2相似性距離

這裡直接使用cos距離了,cos距離是通過向量間的cos夾角來度量相似性,如果是在同一個方向增長,那麼相似性是不會變得。公式如下:
這裡寫圖片描述

1.3樣本資料如下:

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0

2Spark程式碼如下:

package org.wq.scala.ml

import org.apache.spark.mllib.linalg.distributed._
import
org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/10/20. */ object UserBaseTest { def main(args:Array[String]): Unit = { val conf = new SparkConf().setAppName("UserBaseModel").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse"
) val sc = new SparkContext(conf) //test.data是使用者_物品_評分樣本,且使用者為Int,物品為int型 val data = sc.textFile("data/mllib/test.data") val parseData= data.map(_.split(",") match {case Array(user,item,rate)=>MatrixEntry(user.toLong-1,item.toLong-1,rate.toDouble)}) /*parseData.collect().map(x=>{ println(x.i+"->"+x.j+"->"+x.value) })*/ //coordinateMatrix是專門儲存user_item_rating這種資料樣本的 println("ratings:") val ratings = new CoordinateMatrix(parseData) ratings.entries.collect().map(x=>{ println(x.i+","+x.j+","+x.value) }) //把CoordinateMatrix轉換成RowMatrix計算兩個使用者之間的cos相似性,且行表示使用者,列表示物品 //RowMatrix的方法,columnSimilarities是計算,列與列的相似度,現在是user_item_rating,需要轉置(transpose)item_user_rating,這樣才是使用者的相似 //toRowMatrix()之後,物品的順序不是從小到大排序的,但是相似度是Okval matrix= ratings.transpose().toRowMatrix() println("toRowMatrix之後的結果:") matrix.rows.collect().map(x=>{ x.toArray.map(x=>{print(x+",")}) println("") }) val similarities = matrix.columnSimilarities() //相似性是對的 println("相似性") similarities.entries.collect().map(x=>{ println(x.i+"->"+x.j+"->"+x.value) }) /* similarities.entries.filter(_.i==0).sortBy(_.value,false).collect().map(x=>{ println(x.i+"->"+x.j+"->"+x.value) })*/ //計算使用者1對物品1的評分,預測結果為,使用者1的評價分+其他相似使用者對其的加權平均值,相似度為權重 // val ratingOfUser1 = ratings.toRowMatrix().rows.collect()(3).toArray ,這個就是數字不能代表user的下標 // toRowMatrix()好像有點問題 val ratingOfUser1 = ratings.entries.filter(_.i==0).map(x=>{(x.j,x.value)}).sortBy(_._1).collect().map(_._2).toList.toArray val avgRatingOfUser1 = ratingOfUser1.sum/ratingOfUser1.size //println(avgRatingOfUser1) //計算其他使用者對物品1的加權平均值,matrix是物品_使用者_評分 //matrix的一行,就是物品的所有使用者評分,drop(1)表示刪除自己的評分哈 //matrix(n)不能表示使用者的下標啊 val ratingsToItem1=matrix.rows.collect()(0).toArray.drop(1) //ratingsToItem1.map(x=>print(x)) //權重_.i==0選擇第一個使用者,sortBy(_.j)表示根據使用者的下標作為Keyvalue降序(value越大,表示相似度越高),所以,越前相似度越高 val weights =similarities.entries.filter(_.i==0).sortBy(_.j).map(_.value).collect() //val weights =similarities.entries.filter(_.i==0).sortBy(_.value,false).map(_.value).collect() //(0 to 2)表示從0到2,預設步長1,這裡表示,去top2相似的使用者作為預測使用者評分,真實情況,topn太少了哈 //sum(權重*使用者評分)/sum(weights) var weightedR = (0 to 2).map(t=>weights(t) * ratingsToItem1(t)).sum/weights.sum //把平均值+top2相似使用者的加權平均值 println("rating of uses1 to item1 is "+(avgRatingOfUser1)) println("rating of uses1 to item1 is "+(weightedR)) println("rating of uses1 to item1 is "+(avgRatingOfUser1+weightedR)) } }

程式碼有註釋哈,應該都可以看得懂,主要就是計算相似讀,計算使用者1給item1的評分,這裡評分的計算為:使用者均值+topn使用者的加權平均值,權重為相似性。

3坑點

1測試了300w多w記錄,使用者估計20w,物品大500,windows單機環境16g記憶體,配置2g的xxm,跑了1個小時都沒有出來,速度太慢了,當然也跟機子配置有關,直接停了。
2中間的轉成行矩陣的方法噁心,toRowMatrix(),就是這個方法。因為,使用這個方法之後,矩陣的使用者的標號順序都變了,不知道怎麼判斷,標號和使用者號都不一樣了。舉個例子大家就知道了,都可以試試:

//下面程式的結果,這個結果是ok的。
//使用者_物品_打分
val ratings = new CoordinateMatrix(parseData)
ratings.entries.collect().map(x=>{
       println("ratings=>"+x.i+"->"+x.j+"->"+x.value)
     })

執行的結果,和原始的樣本一樣的:

0,0,5.0
0,1,1.0
0,2,5.0
0,3,1.0
1,0,5.0
1,1,1.0
1,2,5.0
1,3,1.0
2,0,1.0
2,1,5.0
2,2,1.0
2,3,5.0
3,0,1.0
3,1,5.0
3,2,1.0

但是做了下面的轉換成行矩陣的做法之後:

下面是做了transpose().toRowRamtrix的結果
ratings.toRowMatrix().rows.collect().map(x=>{
  println()
  x.toArray.map(t=>{
    println(t+",")
  })
})

5.0,5.0,1.0,1.0,
1.0,1.0,5.0,5.0,
1.0,1.0,5.0,0.0,
5.0,5.0,1.0,1.0,

matrix的遍歷方式為map,還不能輸入使用者id查詢,噁心,使用者2的打分使用者3的打分反了,人工對比上下兩個資料就知道了。
但是隻能使用遍歷方式,遍歷matrix,我怎麼知道這條記錄是哪個使用者的。

不過我人工計算了和程式計算的相似度是差不多的,相似度應該是OK的,這裡也跪求大神指點疑問?

//程式計算相似度
2->3->0.7205766921228921
0->1->1.0000000000000002
1->2->0.3846153846153847
0->3->0.4003203845127179
1->3->0.4003203845127179
0->2->0.3846153846153847

4把jar提交到spark叢集執行

4.1打包方式

我使用的是idea,使用ctrl+alt+shift+s,
這裡寫圖片描述
這裡寫圖片描述
然後刪除所有的依賴jar包,除非你想打出幾百M的jar.

4.2執行jar與注意事項

使用rz上傳到centos中,shh工具或者其他工具都是可以,自己喜歡就好,
注意要保證資料檔案在每個節點上都有哈
我的目錄結構為(三臺機器都要一樣哈):
執行jar目錄:/home/jar/
這裡寫圖片描述
執行jar的資料目錄為:/home/jar/data
這裡寫圖片描述

jar與資料都好了之後,保證spark叢集執行哈,然後輸入命令執行我們的jar.

spark-submit  --class org.wq.scala.ml.UserBase --master spark://master:7077 --executor-memory 1g --num-executors 1  /home/jar/UserBaseSpark.jar /home/jar/data/test.data

執行成功如圖:
這裡寫圖片描述
這裡寫圖片描述

4.3注意事項

1保證你的資料檔案在節點中都有,不然彙報錯誤:
這裡寫圖片描述

2保證你提交的job,設定的執行記憶體沒有超過你自己在spark-env.sh中的記憶體,不然要報如下警告,資源不足,程式掛起,不能執行下去:
這裡寫圖片描述

關於toRowMatrix()方法的疑問,求解大神解析。
有時間也會看原始碼研究下,
下一篇文章會寫基於物品的協同過濾。
如果想做真實基於Spark的推薦,個人建議使用基於模型的與預計關聯規則的推薦