1. 程式人生 > >如何構建一個生產環境的推薦系統

如何構建一個生產環境的推薦系統

1.概述

前面介紹過什麼是推薦系統,以及推薦系統中的用例,比如基於使用者的協同過濾來構建推薦系統。今天給大家介紹如何構建一個生產環境的推薦系統。

2.內容

現在網際網路上的內容很多,我們可能每天都會接受來自不同訊息。例如,電商網站、閱讀部落格、各類新聞文章等。但是,這些訊息並不是所有的內容你都感興趣,可能你只對技術部落格感興趣,或者某些新聞感興趣等等。而這麼內容如何去滿足使用者的需求呢?我們需要一個精準的解決方案來簡化使用者的發現過程。

2.1 推薦系統的作用是啥?

簡而言之,推薦系統就是一個發現使用者喜好的系統。系統從資料中學習並向用戶提供有效的建議。如果使用者沒有特意搜尋某項物品,則系統會自動將該項帶出。這樣看起很神奇,比如,你在電商網站上瀏覽過某個品牌的鞋子,當你在用一些社交軟體、短視訊軟體、視訊軟體時,你會驚奇的發現在你所使用的這些軟體中,會給你推薦你剛剛在電商網站上瀏覽的過的鞋子。

其實,這得益於推薦系統的過濾功能。我們來看看一張簡圖,如下圖所示:

 

 從上圖中,我們可以簡單的總結出,整個資料流程如下:

  • 資料來源:負責提供資料來源,比如使用者在電商網站、新聞、視訊等上的使用者行為,作為推薦訓練的資料來源;
  • 資料採集:使用者產生了資料,我們需要將這些資料進行收集,比如SDK埋點採集、Nginx上報、爬蟲等方式來獲取資料;
  • 資料儲存:獲取這些資料後,需要對這些資料進行分類儲存、清洗等,比如大資料裡面用的最多的HDFS,或者構建資料倉庫Hive表等;
  • 推薦系統:資料分類、清洗後好,有了推薦系統需要的資料,然後使用推薦系統中的各種模型、比如協同過濾、內容過濾、相似過濾、使用者矩陣等,來訓練這些使用者資料,得到訓練結果;
  • 目標使用者:通過推薦系統,對使用者資料進行訓練後,得出訓練結果,將這些結果,推薦給目標使用者。

2.2 依賴準備

我們使用Python來夠構建推薦系統模型,需要依賴如下的Python依賴包:

pip install numpy
pip install scipy
pip install pandas
pip install jupyter
pip install requests

這裡為簡化Python的依賴環境,推薦使用Anaconda3。這裡面集成了很多Python的依賴庫,不用我們在額外去關注Python的環境準備。

接著,我們載入資料來源,程式碼如下:

import pandas as pd
import numpy as np

df = pd.read_csv('resource/events.csv')
df.shape
print(df.head())

結果如下:

 

使用df.head()會列印資料前5行資料:

  • timestamp:時間戳
  • visitorid:使用者ID
  • event:事件型別
  • itemid:物品ID
  • transactionid:事務ID

使用如下程式碼,檢視事件型別有哪些:

print(df.event.unique())

結果如下:

 

從上圖可知,型別有三種,分別是:view、addtocart、transaction。

為了簡化起見,以transaction型別為例子。程式碼如下所示:

trans = df[df['event'] == 'transaction']
trans.shape
print(trans.head())

結果如下圖所示:

 

接著,我們來看看使用者和物品的相關資料,程式碼如下:

visitors = trans['visitorid'].unique()
items = trans['itemid'].unique()
print(visitors.shape)
print(items.shape)

 

 我們可以獲得11719個去重使用者和12025個去重物品。

構建一個簡單而有效的推薦系統的經驗法則是在不損失精準度的情況下減少資料的樣本。這意味著,你只能為每個使用者獲取大約50個最新的事務樣本,並且我們仍然可以得到期望中的結果。

程式碼如下所示:

trans2 = trans.groupby(['visitorid']).head(50)
print(trans2.shape)

 

 真實場景中,使用者ID和物品ID是一個海量數字,人為很難記住,比如如下程式碼:

trans2['visitors'] = trans2['visitorid'].apply(lambda x : np.argwhere(visitors == x)[0][0])
trans2['items'] = trans2['itemid'].apply(lambda x : np.argwhere(items == x)[0][0])

print(trans2)

結果如下圖所示:

 

2.3 構建矩陣

2.3.1 構建使用者-物品矩陣 

從上面的程式碼執行的結果來看,目前樣本資料中有11719個去重使用者和12025個去重物品,因此,我們接下來構建一個稀疏矩陣。需要用到如下Python依賴:

from scipy.sparse import csr_matrix

實現程式碼如下所示:

occurences = csr_matrix((visitors.shape[0], items.shape[0]), dtype='int8')
def set_occurences(visitor, item):
    occurences[visitor, item] += 1
trans2.apply(lambda row: set_occurences(row['visitors'], row['items']), axis=1)
print(occurences)

結果如下所示:

(0, 0)        1
  (1, 1)        1
  (1, 37)       1
  (1, 72)       1
  (1, 108)      1
  (1, 130)      1
  (1, 131)      1
  (1, 132)      1
  (1, 133)      1
  (1, 162)      1
  (1, 163)      1
  (1, 164)      1
  (2, 2)        1
  (3, 3)        1
  (3, 161)      1
  (4, 4)        1
  (4, 40)       1
  (5, 5)        1
  (5, 6)        1
  (5, 18)       1
  (5, 19)       1
  (5, 54)       1
  (5, 101)      1
  (5, 111)      1
  (5, 113)      1
  :     :
  (11695, 383)  1
  (11696, 12007)        1
  (11696, 12021)        1
  (11697, 12008)        1
  (11698, 12011)        1
  (11699, 1190) 1
  (11700, 506)  1
  (11701, 11936)        1
  (11702, 10796)        1
  (11703, 12013)        1
  (11704, 12016)        1
  (11705, 12017)        1
  (11706, 674)  1
  (11707, 3653) 1
  (11708, 12018)        1
  (11709, 12019)        1
  (11710, 1330) 1
  (11711, 4184) 1
  (11712, 3595) 1
  (11713, 12023)        1
  (11714, 3693) 1
  (11715, 5690) 1
  (11716, 6280) 1
  (11717, 3246) 1
  (11718, 2419) 1
View Code

2.3.2 構建物品-物品共生矩陣

構建一個物品與物品矩陣,其中每個元素表示一個使用者購買兩個物品的次數,可以認為是一個共生矩陣。要構建一個共生矩陣,需要將發生矩陣的轉置與自身進行點乘。

cooc = occurences.transpose().dot(occurences)
cooc.setdiag(0)
print(cooc)

結果如下所示:

(0, 0)        0
  (164, 1)      1
  (163, 1)      1
  (162, 1)      1
  (133, 1)      1
  (132, 1)      1
  (131, 1)      1
  (130, 1)      1
  (108, 1)      1
  (72, 1)       1
  (37, 1)       1
  (1, 1)        0
  (2, 2)        0
  (161, 3)      1
  (3, 3)        0
  (40, 4)       1
  (4, 4)        0
  (8228, 5)     1
  (8197, 5)     1
  (8041, 5)     1
  (8019, 5)     1
  (8014, 5)     1
  (8009, 5)     1
  (8008, 5)     1
  (7985, 5)     1
  :     :
  (11997, 12022)        1
  (2891, 12022) 1
  (12023, 12023)        0
  (12024, 12024)        0
  (11971, 12024)        1
  (11880, 12024)        1
  (10726, 12024)        1
  (8694, 12024) 1
  (4984, 12024) 1
  (4770, 12024) 1
  (4767, 12024) 1
  (4765, 12024) 1
  (4739, 12024) 1
  (4720, 12024) 1
  (4716, 12024) 1
  (4715, 12024) 1
  (4306, 12024) 1
  (2630, 12024) 1
  (2133, 12024) 1
  (978, 12024)  1
  (887, 12024)  1
  (851, 12024)  1
  (768, 12024)  1
  (734, 12024)  1
  (220, 12024)  1
View Code

這樣一個稀疏矩陣就構建好了,並使用setdiag函式將對角線設定為0(即忽略第一項的值)。

接下來會用到一個和餘弦相似度的演算法類似的演算法LLR(Log-Likelihood Ratio)。LLR演算法的核心是分析事件的計數,特別是事件同時發生的計數。而我們需要的技術一般包括:

  • 兩個事件同時發生的次數(K_11)
  • 一個事件發生而另外一個事件沒有發生的次數(K_12、K_21)
  • 兩個事件都沒有發生(K_22)

表格表示如下:

  事件A 事件B
事件B A和B同時發生(K_11) B發生,單A不發生(K_12)
任何事件但不包含B A發生,但是B不發生(K_21) A和B都不發生(K_22)

通過上述表格描述,我們可以較為簡單的計算LLR的分數,公式如下所示:

LLR=2 sum(k)(H(k)-H(rowSums(k))-H(colSums(k)))

那回到本案例來,實現程式碼如下所示:

def xLogX(x):
    return x * np.log(x) if x != 0 else 0.0
def entropy(x1, x2=0, x3=0, x4=0):
    return xLogX(x1 + x2 + x3 + x4) - xLogX(x1) - xLogX(x2) - xLogX(x3) - xLogX(x4)
def LLR(k11, k12, k21, k22):
    rowEntropy = entropy(k11 + k12, k21 + k22)
    columnEntropy = entropy(k11 + k21, k12 + k22)
    matrixEntropy = entropy(k11, k12, k21, k22)
    if rowEntropy + columnEntropy < matrixEntropy:
        return 0.0
    return 2.0 * (rowEntropy + columnEntropy - matrixEntropy)
def rootLLR(k11, k12, k21, k22):
    llr = LLR(k11, k12, k21, k22)
    sqrt = np.sqrt(llr)
    if k11 * 1.0 / (k11 + k12) < k21 * 1.0 / (k21 + k22):
        sqrt = -sqrt
    return sqrt

程式碼中的K11、K12、K21、K22分別代表的含義如下:

  • K11:兩個事件都發送
  • K12:事件B傳送,而事件A不發生
  • K21:事件A傳送,而事件B不發生
  • K22:事件A和B都不發生

那我們計算的公式,實現的程式碼如下所示:

row_sum = np.sum(cooc, axis=0).A.flatten()
column_sum = np.sum(cooc, axis=1).A.flatten()
total = np.sum(row_sum, axis=0)
pp_score = csr_matrix((cooc.shape[0], cooc.shape[1]), dtype='double')
cx = cooc.tocoo()
for i,j,v in zip(cx.row, cx.col, cx.data):
    if v != 0:
        k11 = v
        k12 = row_sum[i] - k11
        k21 = column_sum[j] - k11
        k22 = total - k11 - k12 - k21
        pp_score[i,j] = rootLLR(k11, k12, k21, k22)

然後,我們對結果進行排序,讓每一項的最高LLR分數位於每行的第一列,實現程式碼如下所示:

result = np.flip(np.sort(pp_score.A, axis=1), axis=1)
result_indices = np.flip(np.argsort(pp_score.A, axis=1), axis=1)

例如我們來看看其中一項結果,程式碼如下:

print(result[8456])
print(result_indices[8456])

結果如下所示:

 

實際情況中,我們會根據經驗對LLR分數進行一些限制,因此將不重要的指標會進行刪除。

minLLR = 5
indicators = result[:, :50]
indicators[indicators < minLLR] = 0.0
indicators_indices = result_indices[:, :50]
max_indicator_indices = (indicators==0).argmax(axis=1)
max = max_indicator_indices.max()
indicators = indicators[:, :max+1]
indicators_indices = indicators_indices[:, :max+1]

訓練出結果後,我們可以將其放入到ElasticSearch中進行實時檢索。使用到的Python依賴庫如下:

import requests
import json

這裡使用ElasticSearch的批量更新API,建立一個新的索引,實現程式碼如下:

actions = []
for i in range(indicators.shape[0]):
    length = indicators[i].nonzero()[0].shape[0]
    real_indicators = items[indicators_indices[i, :length]].astype("int").tolist()
    id = items[i]
    
    action = { "index" : { "_index" : "items2", "_id" : str(id) } }
    
    data = {
        "id": int(id),
        "indicators": real_indicators
    }
    
    actions.append(json.dumps(action))
    actions.append(json.dumps(data))
    
    if len(actions) == 200:
        actions_string = "\n".join(actions) + "\n"
        actions = []
        
        url = "http://127.0.0.1:9200/_bulk/"
        headers = {
            "Content-Type" : "application/x-ndjson"
        }
        requests.post(url, headers=headers, data=actions_string)
if len(actions) > 0:
    actions_string = "\n".join(actions) + "\n"
    actions = []
    url = "http://127.0.0.1:9200/_bulk/"
    headers = {
        "Content-Type" : "application/x-ndjson"
    }
    requests.post(url, headers=headers, data=actions_string)

在瀏覽器中訪問地址http://127.0.0.1:9200/items2/_count,結果如下所示:

 

接下來,我們可以嘗試將訪問地址切換為這個http://127.0.0.1:9200/items2/240708,結果如下所示:

 

3.總結

構建一個面向生產環境的推薦系統並不困難,目前現有的技術元件可以滿足我們構建這樣一個生產環境的推薦系統。比如Hadoop、Hive、HBase、Kafka、ElasticSearch等這些成熟的開源元件來構建我們的生產環境推薦系統。本案例的完整程式碼如下所示:

import pandas as pd
import numpy as np
from scipy.sparse import csr_matrix
import requests
import json

df = pd.read_csv('resource/events.csv')
# print(df.shape)
# print(df.head())
# print(df.event.unique())
trans = df[df['event'] == 'transaction']
# print(trans.shape)
# print(trans.head())

visitors = trans['visitorid'].unique()
items = trans['itemid'].unique()
# print(visitors.shape)
# print(items.shape)

trans2 = trans.groupby(['visitorid']).head(50)
# print(trans2.shape)

trans2['visitors'] = trans2['visitorid'].apply(lambda x : np.argwhere(visitors == x)[0][0])
trans2['items'] = trans2['itemid'].apply(lambda x : np.argwhere(items == x)[0][0])

# print(trans2)
occurences = csr_matrix((visitors.shape[0], items.shape[0]), dtype='int8')
def set_occurences(visitor, item):
    occurences[visitor, item] += 1
trans2.apply(lambda row: set_occurences(row['visitors'], row['items']), axis=1)
# print(occurences)

cooc = occurences.transpose().dot(occurences)
cooc.setdiag(0)
# print(cooc)

def xLogX(x):
    return x * np.log(x) if x != 0 else 0.0
def entropy(x1, x2=0, x3=0, x4=0):
    return xLogX(x1 + x2 + x3 + x4) - xLogX(x1) - xLogX(x2) - xLogX(x3) - xLogX(x4)
def LLR(k11, k12, k21, k22):
    rowEntropy = entropy(k11 + k12, k21 + k22)
    columnEntropy = entropy(k11 + k21, k12 + k22)
    matrixEntropy = entropy(k11, k12, k21, k22)
    if rowEntropy + columnEntropy < matrixEntropy:
        return 0.0
    return 2.0 * (rowEntropy + columnEntropy - matrixEntropy)
def rootLLR(k11, k12, k21, k22):
    llr = LLR(k11, k12, k21, k22)
    sqrt = np.sqrt(llr)
    if k11 * 1.0 / (k11 + k12) < k21 * 1.0 / (k21 + k22):
        sqrt = -sqrt
    return sqrt

row_sum = np.sum(cooc, axis=0).A.flatten()
column_sum = np.sum(cooc, axis=1).A.flatten()
total = np.sum(row_sum, axis=0)
pp_score = csr_matrix((cooc.shape[0], cooc.shape[1]), dtype='double')
cx = cooc.tocoo()
for i,j,v in zip(cx.row, cx.col, cx.data):
    if v != 0:
        k11 = v
        k12 = row_sum[i] - k11
        k21 = column_sum[j] - k11
        k22 = total - k11 - k12 - k21
        pp_score[i,j] = rootLLR(k11, k12, k21, k22)
    
result = np.flip(np.sort(pp_score.A, axis=1), axis=1)
result_indices = np.flip(np.argsort(pp_score.A, axis=1), axis=1)
print(result.shape)

print(result[8456])
print(result_indices[8456])

minLLR = 5
indicators = result[:, :50]
indicators[indicators < minLLR] = 0.0
indicators_indices = result_indices[:, :50]
max_indicator_indices = (indicators==0).argmax(axis=1)
max = max_indicator_indices.max()
indicators = indicators[:, :max+1]
indicators_indices = indicators_indices[:, :max+1]

actions = []
for i in range(indicators.shape[0]):
    length = indicators[i].nonzero()[0].shape[0]
    real_indicators = items[indicators_indices[i, :length]].astype("int").tolist()
    id = items[i]
    
    action = { "index" : { "_index" : "items2", "_id" : str(id) } }
    
    data = {
        "id": int(id),
        "indicators": real_indicators
    }
    
    actions.append(json.dumps(action))
    actions.append(json.dumps(data))
    
    if len(actions) == 200:
        actions_string = "\n".join(actions) + "\n"
        actions = []
        
        url = "http://127.0.0.1:9200/_bulk/"
        headers = {
            "Content-Type" : "application/x-ndjson"
        }
        requests.post(url, headers=headers, data=actions_string)
if len(actions) > 0:
    actions_string = "\n".join(actions) + "\n"
    actions = []
    url = "http://127.0.0.1:9200/_bulk/"
    headers = {
        "Content-Type" : "application/x-ndjson"
    }
    requests.post(url, headers=headers, data=actions_string)
View Code

4.結束語

這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大資料探勘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買博主的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學視