1. 程式人生 > >關聯分析——Apriori演算法以及Python實現

關聯分析——Apriori演算法以及Python實現

Aprior演算法是比較經典的關聯規則挖掘演算法。

核心思想

核心就是先驗原理,即頻繁項集的子集必定是頻繁項集。反之,若子集非頻繁,則超集必定非頻繁。

演算法簡介

基本概念

購物籃事務(transaction):一位顧客一次購買商品的記錄就是一條事務。
項集(itemset):商品的集合。
頻繁項集:集合內商品數量大於閾值的項集。
關聯規則:形如XYX,Y均為頻繁項集。
支援度X,Y同時出現的事務數與總事務數的比值。
置信度X,Y同時出現的事務數與X出現的事務數的比值。其意義就是X出現後,多大的機率Y會出現。

獲取候選K頻繁項集

通常採取合併K-1頻繁項集的方法來獲得K頻繁項集。
1.合併:由於頻繁項集是有序的,所以若兩個K-1頻繁項集的前K-2個相同,則考慮合併。
2.剪枝:合併後。只能確定原來的兩個K-1項集是頻繁的。還需要根據先驗原理,檢驗K項集是否頻繁。即逐個判定該K項集的子K-1項集是否頻繁。

產生規則

一個K頻繁項可以產生多個規則。同樣,需要根據先驗原理,去除置信度低於閾值的規則。(頻繁項集產生的規則必定滿足支援度,故無需考慮支援度)
1. 初始化候選規則,即規則前件含有K-1個項
2. 依次計算候選規則的置信度,大於閾值則儲存規則,反之拋棄。
3. 根據上一輪規則集合,合併前件產生新一輪的候選規則, 轉步驟2。直到候選規則為空。

演算法流程

-Input:購物籃事務資料集
-Output: 頻繁項集及支援度,規則及置信度
-Step1:初始化頻繁1項集
-Step2: 根據頻繁K-1項集產生候選頻繁K項集,逐個計算支援度,產生頻繁K項集。轉步驟2,直到出現候選項集為空
-Step3:依次根據每個頻繁項集產生規則

程式碼

"""
Apriori關聯分析演算法
核心思想:先驗原理
"""
from collections import Counter, defaultdict


class Apriori:
    def __init__(self, minsup, minconf):
        self.minsup = minsup
        self.minconf = minconf
        self.data = None
self.N = None # 購物籃資料的總數 self.D = None # 頻繁項集的最大項個數 self.fre_list = [] # 頻繁項集,[[[],[]],[[],[]]] self.sup_list = [] # 儲存每個頻繁項的支援度 self.fre_dict = defaultdict(lambda: 0) # 鍵為頻繁項集的tuple,值為支援度 self.rules_dict = defaultdict(lambda: 0) # 規則,鍵為規則前件和規則後件的tuple, 值為置信度 def init_param(self, data): # 根據傳入的資料初始化引數 self.data = sorted(data) self.N = len(data) self.D = 0 item_counter = Counter() for itemset in data: if len(itemset) > self.D: self.D = len(itemset) item_counter += Counter(itemset) itemset = sorted(item_counter) # 保證有序 c1 = [] sup_c1 = [] for item in itemset: sup = item_counter[item] / self.N if sup > self.minsup: c1.append([item]) sup_c1.append(sup) self.fre_list.append(c1) self.sup_list.append(sup_c1) return def apriori_fre_itemset(self): # 使用Apriori演算法獲取頻繁項集 for i in range(1, self.D): # 逐漸增加頻繁項大小 ck_1 = self.fre_list[i - 1] if len(ck_1) < 2: # 若k-1頻繁項集不足兩個,則跳出迴圈 break cand_ck_set = self.ck_itemset(i, ck_1) sup_ck = [] ck = [] for item in cand_ck_set: # 計算ck的支援度 sup = self.cal_sup(item) if sup > self.minsup: ck.append(item) sup_ck.append(sup) if len(ck) > 0: self.fre_list.append(ck) self.sup_list.append(sup_ck) for ck, sup_ck in zip(self.fre_list, self.sup_list): for itemset, sup in zip(ck, sup_ck): self.fre_dict[tuple(itemset)] = sup return def ck_itemset(self, ind, ck_1): # 根據k-1頻繁項集產生k頻繁項集, 產生候選然後減枝, 返回頻繁項的list cand_ck_set = [] for i in range(len(ck_1)): # 合併兩個k-1頻繁項集 cand_ck = ck_1[i] for j in range(i + 1, len(ck_1)): if ck_1[i][:ind - 1] == ck_1[j][:ind - 1]: # 若前k-2項相同則合併 cand_ck.append(ck_1[j][-1]) # 合併形成頻繁k項 if self.prune(cand_ck, ck_1): # 檢查其他k-1項集是否為頻繁項集,進而減枝 cand_ck_set.append(cand_ck.copy()) cand_ck.pop() return cand_ck_set def prune(self, cand_ck_item, ck_1): # 根據k-1頻繁項集來對k頻繁項是否頻繁 for item in cand_ck_item[:-2]: sub_item = cand_ck_item.copy() sub_item.remove(item) if sub_item not in ck_1: return False return True def cal_sup(self, item): # 支援度計數 s = set(item) sup = 0 for t in self.data: if s.issubset(t): sup += 1 return sup / self.N def cal_conf(self, sxy, X): # 計算置信度, sxy為產生規則的頻繁項集的支援度, X為規則前件 return sxy / self.fre_dict[tuple(X)] def gen_rules(self): # 從頻繁項集中提取規則 for i in range(1, len(self.fre_list)): for ind, itemset in enumerate(self.fre_list[i]): cand_rules = [] # 由該頻繁項集產生的規則的list, 記錄規則前件 sxy = self.sup_list[i][ind] for item in itemset: # 初始化後件為1個項的規則 X = itemset.copy() X.remove(item) cand_rules.append(X) while len(cand_rules) > 0: itemset_rules = [] for X in cand_rules: conf = self.cal_conf(sxy, X) if conf > self.minconf: itemset_rules.append(X) Y = list(set(itemset) - set(X)) Y = sorted(Y) self.rules_dict[(tuple(X), tuple(Y))] = conf cand_rules = self.apriori_rules(itemset_rules) return def apriori_rules(self, itemset_rules): # 根據先驗原理產生候選規則 cand_rules = [] for i in range(len(itemset_rules)): for j in range(i + 1, len(itemset_rules)): X = list(set(itemset_rules[i]) & set(itemset_rules[j])) # 合併生成新的規則前件 X = sorted(X) if X in cand_rules or len(X) < 1: # 若該規則前件已經產生或者為空則跳過 continue cand_rules.append(X) return cand_rules def fit(self, data): self.init_param(data) self.apriori_fre_itemset() self.gen_rules() return if __name__ == '__main__': data = [['l1', 'l2', 'l5'], ['l2', 'l4'], ['l2', 'l3'], ['l1', 'l2', 'l4'], ['l1', 'l3'], ['l2', 'l3'], ['l1', 'l3'], ['l1', 'l2', 'l3', 'l5'], ['l1', 'l2', 'l3']] AP = Apriori(minsup=0.2, minconf=0.6) AP.fit(data) print(AP.rules_dict)

我的GitHub
注:如有不當之處,請指正。