1. 程式人生 > >使用者推薦Slope One演算法與mapreduce&hive實現

使用者推薦Slope One演算法與mapreduce&hive實現

使用者推薦越來越熱, Google使用MinHash, PLSI, LDA, SVD, SVM等演算法,分析使用者的喜好, 實現新聞的自動分類;新浪也用Slope One以及一些Item-based的演算法對音樂進行推薦; 淘寶定期會啟動MapReduce作業分析前一天或者一個月使用者收藏的寶貝,給相同喜好的買家提供推薦服務。

本文要描述的Slope One 演算法是一種對評分進行預測的演算法, 它相對於SVD, PLSI, LDA這一類model-based演算法來說有以下特點:

1. 簡單, 容易實現

2. 訓練得到的模型可以增量更新

3. 預測速度很快

4. 使用者可以只做過一兩次評分,就可以獲得推薦.

5. 準確度比較理想

okay,  找到一篇介紹演算法的:http://www.fuchaoqun.com/2008/09/slope_one/

講的不錯,就不再重複了。

英文wiki上也有介紹http://en.wikipedia.org/wiki/Slope_One

其中python的實現比較簡潔

# Copyright 2006 Bryan O'Sullivan <[email protected]>.
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, version 2 or later, which is
# incorporated herein by reference.

class SlopeOne(object):
    def __init__(self):
        self.diffs = {}
        self.freqs = {}

    def predict(self, userprefs):
        preds, freqs = {}, {}
        for item, rating in userprefs.iteritems():
            for diffitem, diffratings in self.diffs.iteritems():
                try:
                    freq = self.freqs[diffitem][item]
                except KeyError:
                    continue
                preds.setdefault(diffitem, 0.0)
                freqs.setdefault(diffitem, 0)
                preds[diffitem] += freq * (diffratings[item] + rating)
                freqs[diffitem] += freq
        return dict([(item, value / freqs[item])
                     for item, value in preds.iteritems()
                     if item not in userprefs and freqs[item] > 0])

    def update(self, userdata):
        for ratings in userdata.itervalues():
            for item1, rating1 in ratings.iteritems():
                self.freqs.setdefault(item1, {})
                self.diffs.setdefault(item1, {})
                for item2, rating2 in ratings.iteritems():
                    self.freqs[item1].setdefault(item2, 0)
                    self.diffs[item1].setdefault(item2, 0.0)
                    self.freqs[item1][item2] += 1
                    self.diffs[item1][item2] += rating1 - rating2
                    print self.diffs[item1][item2]
        for item1, ratings in self.diffs.iteritems():
            for item2 in ratings:
                ratings[item2] /= self.freqs[item1][item2]

if __name__ == '__main__':
    userdata = dict(
        alice=dict(squid=1.0,
                   cuttlefish=0.5,
                   octopus=0.2),
        bob=dict(squid=1.0,
                 octopus=0.5,
                 nautilus=0.2),
        carole=dict(squid=0.2,
                    octopus=1.0,
                    cuttlefish=0.4,
                    nautilus=0.4),
        dave=dict(cuttlefish=0.9,
                  octopus=0.4,
                  nautilus=0.5),
        )
    s = SlopeOne()
    s.update(userdata)
    print s.predict(dict(octopus=0.4)
   

現在分析一下Slope One訓練的空間及時間複雜度,

如果有m個使用者,分別對n件物品進行了評分。每個使用者得進行 n 2 次計算,將產生n(n-1)/2級別的資料量(由於diff是個對角矩陣,可以只取下三角)。所以對m個使用者來說, CPU計算時間是m n 2 , 產生的中間資料是mn(n-1)/2,最後合併m個使用者的這些資料,產生的資料量是n(n-1)/2。

這個演算法的計算量對物品資料是呈平方級別地增長,對使用者數量是線性的。比較恐怖的是它產生的中間資料,如果某使用者物品評價資料為1MB左右, 且資料是double型佔8位元組, 則有1MB / 8B = 128K,此使用者將產生的資料是1MB * (128K - 1) / 2 約為64GB資料量, 這部分中間資料是不可能放在記憶體的,只能通過磁碟,然而磁碟讀寫與主存完全不是一個級別,速度上又造成一個瓶頸。

當然也不必這麼悲觀, Slope One是一個可以進行增量的演算法。假設已經對y件物品進行了訓練,則當前訓練的時間複雜度不會超過n 2 +my 2 . 撇開增量演算法不管, 我們可以用MapReduce的優勢分散式地進行訓練(可以同時使用增量和MapReduce)。以Netflix Prize 的資料為例, 它包含480189個使用者對17770部影片的評分資料,訓練資料有17770個檔案,每個檔案代表一部影片, 其中第一行是影片的id, 其餘行是各使用者對此影片的評分記錄。

MovieID:

CustomerID,Rating,Date

這些檔案都比較小,最大的不過4793673位元組,最小的才70位元組,而MapReduce的檔案塊為64MB。小檔案對於mapreduce任務來說是不利的,將會產生太多mapper. 這兒有一種解決辦法,將tar包轉成sequecefile .

省略此步,直接把解壓後的檔案put到HDFS,然後使用一道mapreduce把資料轉成我們需要的格式。

hadoop dfs -put $NETFLIX_HOME/training_set /user/zhoumin/netflix-source
# 將附件中的程式碼成slopeone-0.00.1-dev.jar後執行
hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOnePreproccessor /user/zhoumin/netflix-source/user/zhoumin/netflix

  然後用SlopeOneTrainer進行訓練。

SlopeOneTrainer的原理每個mapper計算一個使用者各item的diff矩陣。瞭解hadoop中mapper執行機制的人就會發現,有的使用者資料量大,很有可能產生上面說的數十GB的中間資料, 遠遠超過io.sort.mb的值。會造成mapper不停地merge資料,致使速度較慢, 使用36臺個slaves的叢集執行netflix的這部分訓練花了4個多小時,絕大部分時間會花在mapper之上,特別是mapper的merge階段.

於是假如把中間資料交給reducer去處理,更為理想,其實此步訓練相當於一個join操作。於是使用hive比較方便。先將原始資料轉成hive所需要的格式.

hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOneHive /user/zhoumin/netflix-source /user/zhoumin/netflix-hive
 

然後再建立兩張表,netflix是處理後的netflix訓練資料, freq_diff是訓練後的模型矩陣

CREATE EXTERNAL TABLE netflix(
  movie_id STRING,
  user_id STRING,
  rate DOUBLE,
  rate_date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' 
STORED AS TEXTFILE 
LOCATION '/user/zhoumin/netflix-hive'; 

CREATE TABLE freq_diff (
    movie_id1 STRING,
    movie_id2 STRING,
    freq     DOUBLE,
    diff     DOUBLE
);

  okay,執行訓練SQL

INSERT OVERWRITE TABLE freq_diff
SELECT
  nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
FROM
  netflix nf1 
JOIN 
  netflix nf2 ON nf1.user_id = nf2.user_id 
WHERE nf1.movie_id > nf2.movie_id
GROUP BY nf1.movie_id, nf2.movie_id;

 此SQL將會產生兩道mapreduce job,使用 explain命令即可以看到, 第一道主要做join的工作,在reduce端會輸

出所有的中間資料。Hive自動會調整reducer的數量,但這兒的reducer為3, 跑得比較慢(超過9小時),可以將reducer顯式地設大些,我這兒設為160,再跑上面的訓練SQL.

set mapred.reduce.tasks=160;

 兩道job第一道花了33mins, 35sec,第二道花了1hrs, 29mins, 29sec,訓練時間總共約2小時,可以接受。

訓練完畢,就可以試一試預測功能了。假設某使用者給影片1000評了2分,那麼他將會對其它影片評多少分呢? 他將喜歡哪些影片呢?

okay,先做些準備工作

CREATE TABLE predict(
  movie_id STRING,
  rate FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

echo "1000,2" > predict_data

LOAD DATA LOCAL INPATH './predict_data' OVERWRITE INTO TABLE predict;
 

然後就可以進行預測了:

CREATE TABLE slopeone_result(
    movie_id STRING,
    freq     DOUBLE,
    pref     DOUBLE,
    rate     DOUBLE
);

INSERT OVERWRITE TABLE slopeone_result
SELECT
    /*+ MAPJOIN(p) */
    movie_id1                          as movie_id,
    sum(freq)                          as freq,
    sum(freq*(diff + rate))            as pref,
    sum(freq*(diff + rate))/sum(freq)  as rate
FROM
    predict p 
JOIN freq_diff fd ON fd.movie_id2 = p.movie_id
GROUP BY movie_id1

 注意上面使用了一個Map-Side Join的hint, 因為predict表非常小,只需要跑一個map only的job就可以完成join,無需shuffle資料給reduce. 這一步把使用者自身的movie_id也參與計算,由於hive不支援in,所以結果有些偏差。可以用一道MapReduce作業來做預測這一步。

最後select .. order by一下就知道此使用者喜歡哪些影片了。

結論:

1. 使用mapreduce,將運算移至reduce端, 避免map端的merge可以有效地提高訓練速度

2. Slope One是一種簡單易實現的使用者推薦演算法,而且可以增量訓練

3. 結合以上兩點,加上BigTable, HyperTable, Voldermort, Cassendera這種分散式key-value儲存庫,完全可以做到實時使用者推薦(HBase甭提了)。

-----------------------------------------------------------------------------------------------------

附: hive生成的mr job描述.

hive> explain
    > INSERT OVERWRITE TABLE freq_diff
    > SELECT
    >   nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
    > FROM
    >   netflix nf1
    > JOIN
    >   netflix nf2 ON nf1.user_id = nf2.user_id
    > WHERE nf1.movie_id > nf2.movie_id
    > GROUP BY nf1.movie_id, nf2.movie_id;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF netflix nf1) (TOK_TABREF netflix nf2) (= (. (TOK_TABLE_OR_COL nf1) user_id) (. (TOK_TABLE_OR_COL nf2) user_id)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB freq_diff)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf1) movie_id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf2) movie_id)) (TOK_SELEXPR (TOK_FUNCTION count 1)) (TOK_SELEXPR (/ (TOK_FUNCTION sum (- (. (TOK_TABLE_OR_COL nf1) rate) (. (TOK_TABLE_OR_COL nf2) rate))) (TOK_FUNCTION count 1)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        nf2
          TableScan
            alias: nf2
            Reduce Output Operator
              key expressions:
                    expr: user_id
                    type: string
              sort order: +
              Map-reduce partition columns:
                    expr: user_id
                    type: string
              tag: 1
              value expressions:
                    expr: movie_id
                    type: string
                    expr: rate
                    type: double
        nf1
          TableScan
            alias: nf1
            Reduce Output Operator
              key expressions:
                    expr: user_id
                    type: string
              sort order: +
              Map-reduce partition columns:
                    expr: user_id
                    type: string
              tag: 0
              value expressions:
                    expr: movie_id
                    type: string
                    expr: rate
                    type: double
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 {VALUE._col0} {VALUE._col2}
            1 {VALUE._col0} {VALUE._col2}
          outputColumnNames: _col0, _col2, _col4, _col6
          Filter Operator
            predicate:
                expr: (_col0 > _col4)
                type: boolean
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col4
                    type: string
                    expr: _col2
                    type: double
                    expr: _col6
                    type: double
              outputColumnNames: _col0, _col4, _col2, _col6
              Group By Operator
                aggregations:
                      expr: count(1)
                      expr: sum((_col2 - _col6))
                keys:
                      expr: _col0
                      type: string
                      expr: _col4
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3
                File Output Operator
                  compressed: false
                  GlobalTableId: 0
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        hdfs://xxx:9000/user/zhoumin/hive-tmp/22895032/10002
            Reduce Output Operator
              key expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              sort order: ++
              Map-reduce partition columns:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              tag: -1
              value expressions:
                    expr: _col2
                    type: bigint
                    expr: _col3
                    type: double
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: count(VALUE._col0)
                expr: sum(VALUE._col1)
          keys:
                expr: KEY._col0
                type: string
                expr: KEY._col1
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col2
                  type: bigint
                  expr: (_col3 / _col2)
                  type: double
            outputColumnNames: _col0, _col1, _col2, _col3
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
                    expr: UDFToDouble(_col2)
                    type: double
                    expr: _col3
                    type: double
              outputColumnNames: _col0, _col1, _col2, _col3
              File Output Operator
                compressed: true
                GlobalTableId: 1
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    name: freq_diff

  Stage: Stage-0
    Move Operator
      tables:
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: freq_diff