1. 程式人生 > >應用實戰: 如何利用Spark叢集計算物品相似度

應用實戰: 如何利用Spark叢集計算物品相似度

本文是Spark調研筆記的最後一篇,以程式碼例項說明如何藉助Spark平臺高效地實現推薦系統CF演算法中的物品相似度計算。

在推薦系統中,最經典的推薦演算法無疑是協同過濾(Collaborative Filtering, CF),而item-cf又是CF演算法中一個實現簡單且效果不錯的演算法。
在item-cf演算法中,最關鍵的步驟是計算物品之間的相似度。本文以程式碼例項來說明如何利用Spark平臺快速計算物品間的餘弦相似度。
Cosine Similarity是相似度的一種常用度量,根據《推薦系統實踐》一書第2.4.2節關於Item-CF演算法部分的說明,其計算公式如下:

舉個例子,若對item1有過行為的使用者集合為{u1, u2, u3},對item2有過行為的使用者集合為{u1, u3, u4, u5},則根據上面的式子,item1和item2間的相似度為2/(3*4),其中分子的2是因為item1的user_list與item2的user_list的交集長度為2,即item1和item2的共現(co-occurence)次數是2。

在工程實現上,根據論文"Empirical Analysis of Predictive Algorithms for Collaborative Filtering"的分析,為對活躍使用者做懲罰,引入了IUF (Inverse User Frequency)的概念(與TF-IDF演算法引入IDF的思路類似:活躍使用者對物品相似度的貢獻應該小於不活躍的使用者),因此,對餘弦相似度做改進後相似度計算公式如下:

可以看到,上式分子部分的1/log(1 + N(u))體現了對活躍使用者的懲罰。

此外,通常認為使用者在相隔很短的時間內喜歡的物品具有更高相似度。因此,工程實現上,還會考慮時間衰減效應。一種典型的時間衰減函式如下所示:

最終,時間上下文相關的Item-CF演算法中的相似度計算公式如下:

上式中,分母部分與標準的相似度公式分母保持一致;分子部分參與運算的是item_i和item_j的共現使用者集合,其中,f(t)是時間衰減效應的體現,N(u)對活躍使用者做了懲罰。

下面的Python程式碼是計算物品相似度的Spark任務的程式碼片段(從HDFS載入使用者歷史行為日誌,計算物品相似度,相似列表取TopN,將相似度計算結果寫會HDFS),供大家參考:

#!/bin/env/python


import pyspark as ps
import math
import datetime as dt
import util


def generate_item_pair(x):
    """
        Find co-occurence items of every given user 
        Return a tuple in the format of ((item_0, item_1), cooccurrence_factor).
    """
    items = x[1]
    item_cnt = len(items)
    alpha = 1
    for i in items:
        item1 = i[0]
        ts1   = i[1]
        for j in items:
            item2 = j[0]
            ts2   = j[1]
            if item1 != item2:
                ## introduce time decay and penalize active users
                ft = 1.0 / (1 + alpha * abs(ts1 - ts2))
                yield ((item1, item2), (ft / math.log(1 + item_cnt)))


def compute_item_similarity(x):
    items = x[0]
    cooccurrence = float(x[1])
    item_dict = g_item_freq_d 
    norm_factor = 5
    if items[0] in item_dict and items[1] in item_dict:
        freq_0 = item_dict[items[0]]
        freq_1 = item_dict[items[1]]
        ## calculate similarity between the item pair
        sim = cooccurrence / math.sqrt(freq_0 * freq_1)
        ## normalize similarity
        norm_sim = (cooccurrence / (cooccurrence + norm_factor)) * sim
        yield (items[0], (items[1], norm_sim))


def sort_items(x):
    """
        For a given item, sort all items similar to it as descent (using similarity scores), take topN similar items, and return as the following format:
        given_item \t sorted_item_0$sorted_score_0,sorted_item_1$sorted_score_1,...
    """
    similar_items = list(x[1])
    if len(similar_items) > 0:
        ## sort list of (item, score) tuple by score from high to low
        similar_items.sort(key=lambda x: x[1], reverse=True)
        ## format the list of sorted items as a string
        similar_items_str = ",".join(["$".join(map(str,item)) for item in similar_items[0:50]])
        yield "\t".join([str(x[0]), similar_items_str])


def main():
    base_hdfs_uri = "hdfs://to/user/behavior/log"
    today = dt.date.today()
    knn_similarity_file = '%s/%s/knn_sim' % (base_hdfs_uri, today.strftime('%Y%m%d'))

    sc = ps.SparkContext()

    ## load user behavior from hdfs log
    ## each element in user_item is a tuple: (user, (item, timestamp))
    history_s = (today - dt.timedelta(8)).strftime('%Y%m%d')
    history_e = (today - dt.timedelta(2)).strftime('%Y%m%d')
    input_files = util.get_input_files(action='play', start=history_s, end=history_e)
    user_item = sc.textFile(",".join(input_files))\
        .mapPartitions(util.parse_user_item) \
        .map(lambda x: (x[0], (x[1], x[2]))) \
        .distinct() \
        .cache()

    ## compute item frequency and store as a global dict
    item_freq = user_item.map(lambda x: (x[1][0], 1)) \
        .reduceByKey(lambda x, y: x + y) \
        .collect()
    global g_item_freq_d
    g_item_freq_d = dict()
    for x in item_freq:
        g_item_freq_d[x[0]] = x[1]
   
    ## compute item similarity and find top n most similar items  
    item_pair_sim = user_item.groupByKey() \
        .flatMap(generate_item_pair) \
        .reduceByKey(lambda x, y: x + y) \
        .flatMap(compute_item_similarity) \
        .groupByKey() \
        .flatMap(sort_items) \
        .cache()

    ## dump to hdfs
    item_pair_sim.repartition(1).saveAsTextFile(knn_similarity_file)


if __name__ == '__main__':
    main()

上面的程式碼中,import util中引入的util只是負責從HDFS獲取使用者歷史日誌的檔名列表,非常簡單,實現細節這裡不贅述。

5. Spark Programming Guide

========================== EOF ===========================