1. 程式人生 > >協同過濾(ALS)的原理及Python實現

協同過濾(ALS)的原理及Python實現

提到ALS相信大家應該都不會覺得陌生(不陌生你點進來幹嘛[捂臉]),它是協同過濾的一種,並被整合到Spark的Mllib庫中。本文就ALS的基本原理進行講解,並手把手、肩並肩地帶您實現這一演算法。

  1. 原理篇

我們用人話而不是大段的數學公式來講講ALS是怎麼一回事。

1.1 你聽說過推薦演算法麼

假如我是豆瓣的CEO,很多豆瓣的使用者在豆瓣電影上都會對電影進行評分。那麼根據這個評分資料,我們有可能知道這些使用者除了自己評過分的電影之外還喜歡或討厭哪些電影嗎?這就是一個典型的推薦問題,解決這一類問題的演算法被稱為推薦演算法。

1.2 什麼是協同過濾

協同過濾的英文全稱是Collaborative Filtering,簡稱CF。注意,這不是一款遊戲!從字面上分析,協同就是尋找共同點,過濾就是篩選出優質的內容。

1.3 協同過濾的分類

一般來說,協同過濾推薦分為三種類型:

  1. 基於使用者(user-based)的協同過濾,通過計算使用者和使用者的相似度找到跟使用者A相似的使用者B, C, D...再把這些使用者喜歡的內容推薦給A;
  2. 基於物品(item-based)的協同過濾,通過計算物品和物品的相似度找到跟物品1相似的物品2, 3, 4...再把這些物品推薦給看過物品1的使用者們;
  3. 基於模型(model based)的協同過濾。主流的方法可以分為:矩陣分解,關聯演算法,聚類演算法,分類演算法,迴歸演算法,神經網路。

1.4 矩陣分解

矩陣分解 (decomposition, factorization)是將矩陣拆解為數個矩陣的乘積。比如豆瓣電影有m個使用者,n個電影。那麼使用者對電影的評分可以形成一個m行n列的矩陣R,我們可以找到一個m行k列的矩陣U,和一個k行n列的矩陣I,通過U * I來得到矩陣R。

1.5 ALS

如果想通過矩陣分解的方法實現基於模型的協同過濾,ALS是一個不錯的選擇,其英文全稱是Alternating Least Square,翻譯過來是交替最小二乘法。假設使用者為a,物品為b,評分矩陣為R(m, n),可分解為使用者矩陣U(k, m)和物品矩陣I(k, n),其中m, n, k代表矩陣的維度。前方小段數學公式低能預警:

根據矩陣分解的定義,有
image
用MSE作為損失函式,為了方便化簡,加法符號左側的常數改為-1/2
image

  1. 對損失函式求U_a的一階偏導數,那
    image
  2. 令一階偏導數等於0
    image
  3. 同理,可證
    image

1.6 求解使用者矩陣U和物品矩陣I

矩陣R是已知的,我們隨機生成使用者矩陣U,

  1. 利用1.5中的式5、R和U求出I
  2. 利用1.5中的式6、R和I求出U

如此交替地執行步驟1和步驟2,直到演算法收斂或者迭代次數超過了最大限制,最終我們用RMSE來評價模型的好壞。

  1. 實現篇

本人用全宇宙最簡單的程式語言——Python實現了ALS演算法,沒有依賴任何第三方庫,便於學習和使用。簡單說明一下實現過程,更詳細的註釋請參考本人github上的程式碼。

注:程式碼中用到的Matrix類是我寫的一個矩陣類,可以取出矩陣的行或列,計算矩陣的乘法、轉置和逆。

2.1 建立ALS類

初始化,儲存使用者ID、物品ID、使用者ID與使用者矩陣列號的對應關係、物品ID與物品矩陣列號的對應關係、使用者已經看過哪些物品、評分矩陣的Shape以及RMSE。

class ALS(object):
    def __init__(self):
        self.user_ids = None
        self.item_ids = None
        self.user_ids_dict = None
        self.item_ids_dict = None
        self.user_matrix = None
        self.item_matrix = None
        self.user_items = None
        self.shape = None
        self.rmse = None

2.2 資料預處理

對訓練資料進行處理,得到使用者ID、物品ID、使用者ID與使用者矩陣列號的對應關係、物品ID與物品矩陣列號的對應關係、評分矩陣的Shape、評分矩陣及評分矩陣的轉置。

def _process_data(self, X):
    self.user_ids = tuple((set(map(lambda x: x[0], X))))
    self.user_ids_dict = dict(map(lambda x: x[::-1],
                                    enumerate(self.user_ids)))

    self.item_ids = tuple((set(map(lambda x: x[1], X))))
    self.item_ids_dict = dict(map(lambda x: x[::-1],
                                    enumerate(self.item_ids)))

    self.shape = (len(self.user_ids), len(self.item_ids))

    ratings = defaultdict(lambda: defaultdict(int))
    ratings_T = defaultdict(lambda: defaultdict(int))
    for row in X:
        user_id, item_id, rating = row
        ratings[user_id][item_id] = rating
        ratings_T[item_id][user_id] = rating

    err_msg = "Length of user_ids %d and ratings %d not match!" % (
        len(self.user_ids), len(ratings))
    assert len(self.user_ids) == len(ratings), err_msg

    err_msg = "Length of item_ids %d and ratings_T %d not match!" % (
        len(self.item_ids), len(ratings_T))
    assert len(self.item_ids) == len(ratings_T), err_msg
    return ratings, ratings_T

2.3 使用者矩陣乘以評分矩陣

實現稠密矩陣與稀疏矩陣的矩陣乘法,得到使用者矩陣與評分矩陣的乘積。

def _users_mul_ratings(self, users, ratings_T):

    def f(users_row, item_id):
        user_ids = iter(ratings_T[item_id].keys())
        scores = iter(ratings_T[item_id].values())
        col_nos = map(lambda x: self.user_ids_dict[x], user_ids)
        _users_row = map(lambda x: users_row[x], col_nos)
        return sum(a * b for a, b in zip(_users_row, scores))

    ret = [[f(users_row, item_id) for item_id in self.item_ids]
            for users_row in users.data]
    return Matrix(ret)

2.4 物品矩陣乘以評分矩陣

實現稠密矩陣與稀疏矩陣的矩陣乘法,得到物品矩陣與評分矩陣的乘積。

def _items_mul_ratings(self, items, ratings):

    def f(items_row, user_id):
        item_ids = iter(ratings[user_id].keys())
        scores = iter(ratings[user_id].values())
        col_nos = map(lambda x: self.item_ids_dict[x], item_ids)
        _items_row = map(lambda x: items_row[x], col_nos)
        return sum(a * b for a, b in zip(_items_row, scores))

    ret = [[f(items_row, user_id) for user_id in self.user_ids]
            for items_row in items.data]
    return Matrix(ret)

2.5 生成隨機矩陣

def _gen_random_matrix(self, n_rows, n_colums):
    data = [[random() for _ in range(n_colums)] for _ in range(n_rows)]
    return Matrix(data)

2.6 計算RMSE

def _get_rmse(self, ratings):
        m, n = self.shape
        mse = 0.0
        n_elements = sum(map(len, ratings.values()))
        for i in range(m):
            for j in range(n):
                user_id = self.user_ids[i]
                item_id = self.item_ids[j]
                rating = ratings[user_id][item_id]
                if rating > 0:
                    user_row = self.user_matrix.col(i).transpose
                    item_col = self.item_matrix.col(j)
                    rating_hat = user_row.mat_mul(item_col).data[0][0]
                    square_error = (rating - rating_hat) ** 2
                    mse += square_error / n_elements
        return mse ** 0.5

2.7 訓練模型

資料預處理

變數k合法性檢查

生成隨機矩陣U

交替計算矩陣U和矩陣I,並列印RMSE資訊,直到迭代次數達到max_iter

儲存最終的RMSE

def fit(self, X, k, max_iter=10):
    ratings, ratings_T = self._process_data(X)
    self.user_items = {k: set(v.keys()) for k, v in ratings.items()}
    m, n = self.shape

    error_msg = "Parameter k must be less than the rank of original matrix"
    assert k < min(m, n), error_msg

    self.user_matrix = self._gen_random_matrix(k, m)

    for i in range(max_iter):
        if i % 2:
            items = self.item_matrix
            self.user_matrix = self._items_mul_ratings(
                items.mat_mul(items.transpose).inverse.mat_mul(items),
                ratings
            )
        else:
            users = self.user_matrix
            self.item_matrix = self._users_mul_ratings(
                users.mat_mul(users.transpose).inverse.mat_mul(users),
                ratings_T
            )
        rmse = self._get_rmse(ratings)
        print("Iterations: %d, RMSE: %.6f" % (i + 1, rmse))

    self.rmse = rmse

2.8 預測一個使用者

預測一個使用者感興趣的內容,剔除使用者已看過的內容。然後按感興趣分值排序,取出前n_items個內容。

def _predict(self, user_id, n_items):
    users_col = self.user_matrix.col(self.user_ids_dict[user_id])
    users_col = users_col.transpose

    items_col = enumerate(users_col.mat_mul(self.item_matrix).data[0])
    items_scores = map(lambda x: (self.item_ids[x[0]], x[1]), items_col)
    viewed_items = self.user_items[user_id]
    items_scores = filter(lambda x: x[0] not in viewed_items, items_scores)

    return sorted(items_scores, key=lambda x: x[1], reverse=True)[:n_items]

2.9 預測多個使用者

迴圈呼叫2.8,預測多個使用者感興趣的內容。

def predict(self, user_ids, n_items=10):
    return [self._predict(user_id, n_items) for user_id in user_ids]

3 效果評估

3.1 main函式

使用電影評分資料集,訓練模型並統計RMSE。

@run_time
def main():
    print("Tesing the accuracy of ALS...")

    X = load_movie_ratings()

    model = ALS()
    model.fit(X, k=3, max_iter=5)
    print()

    print("Showing the predictions of users...")

    user_ids = range(1, 5)
    predictions = model.predict(user_ids, n_items=2)
    for user_id, prediction in zip(user_ids, predictions):
        _prediction = [format_prediction(item_id, score)
                       for item_id, score in prediction]
        print("User id:%d recommedation: %s" % (user_id, _prediction))

3.2 效果展示

設定k=3,迭代5次,並展示了前4個使用者的推薦內容,最終RMSE為0.370,執行時間46.5秒,效果還算不錯~

image

3.3 工具函式

本人自定義了一些工具函式,可以在github上檢視:

https://github.com/tushushu/imylu/tree/master/imylu/utils

  1. run_time - 測試函式執行時間
  2. load_movie_ratings - 載入電影評分資料

總結

ALS的原理:雞生蛋、蛋生雞

ALS的實現:基本上就是矩陣乘法

原文釋出時間為:2019-1-2
本文作者:李小文
本文來自雲棲社群合作伙伴“ Python愛好者社群”,瞭解相關資訊可以關注“python_shequ”微信公眾號