關聯分析——Apriori演算法以及Python實現
阿新 • • 發佈:2019-01-22
Aprior演算法是比較經典的關聯規則挖掘演算法。
核心思想
核心就是先驗原理,即頻繁項集的子集必定是頻繁項集。反之,若子集非頻繁,則超集必定非頻繁。
演算法簡介
基本概念
購物籃事務(transaction):一位顧客一次購買商品的記錄就是一條事務。
項集(itemset):商品的集合。
頻繁項集:集合內商品數量大於閾值的項集。
關聯規則:形如,均為頻繁項集。
支援度:同時出現的事務數與總事務數的比值。
置信度:同時出現的事務數與出現的事務數的比值。其意義就是出現後,多大的機率會出現。
獲取候選K頻繁項集
通常採取合併K-1頻繁項集的方法來獲得K頻繁項集。
1.合併:由於頻繁項集是有序的,所以若兩個K-1頻繁項集的前K-2個相同,則考慮合併。
2.剪枝:合併後。只能確定原來的兩個K-1項集是頻繁的。還需要根據先驗原理,檢驗K項集是否頻繁。即逐個判定該K項集的子K-1項集是否頻繁。
產生規則
一個K頻繁項可以產生多個規則。同樣,需要根據先驗原理,去除置信度低於閾值的規則。(頻繁項集產生的規則必定滿足支援度,故無需考慮支援度)
1. 初始化候選規則,即規則前件含有K-1個項
2. 依次計算候選規則的置信度,大於閾值則儲存規則,反之拋棄。
3. 根據上一輪規則集合,合併前件產生新一輪的候選規則, 轉步驟2。直到候選規則為空。
演算法流程
-Input:購物籃事務資料集
-Output: 頻繁項集及支援度,規則及置信度
-Step1:初始化頻繁1項集
-Step2: 根據頻繁K-1項集產生候選頻繁K項集,逐個計算支援度,產生頻繁K項集。轉步驟2,直到出現候選項集為空
-Step3:依次根據每個頻繁項集產生規則
程式碼
"""
Apriori關聯分析演算法
核心思想:先驗原理
"""
from collections import Counter, defaultdict
class Apriori:
def __init__(self, minsup, minconf):
self.minsup = minsup
self.minconf = minconf
self.data = None
self.N = None # 購物籃資料的總數
self.D = None # 頻繁項集的最大項個數
self.fre_list = [] # 頻繁項集,[[[],[]],[[],[]]]
self.sup_list = [] # 儲存每個頻繁項的支援度
self.fre_dict = defaultdict(lambda: 0) # 鍵為頻繁項集的tuple,值為支援度
self.rules_dict = defaultdict(lambda: 0) # 規則,鍵為規則前件和規則後件的tuple, 值為置信度
def init_param(self, data):
# 根據傳入的資料初始化引數
self.data = sorted(data)
self.N = len(data)
self.D = 0
item_counter = Counter()
for itemset in data:
if len(itemset) > self.D:
self.D = len(itemset)
item_counter += Counter(itemset)
itemset = sorted(item_counter) # 保證有序
c1 = []
sup_c1 = []
for item in itemset:
sup = item_counter[item] / self.N
if sup > self.minsup:
c1.append([item])
sup_c1.append(sup)
self.fre_list.append(c1)
self.sup_list.append(sup_c1)
return
def apriori_fre_itemset(self):
# 使用Apriori演算法獲取頻繁項集
for i in range(1, self.D): # 逐漸增加頻繁項大小
ck_1 = self.fre_list[i - 1]
if len(ck_1) < 2: # 若k-1頻繁項集不足兩個,則跳出迴圈
break
cand_ck_set = self.ck_itemset(i, ck_1)
sup_ck = []
ck = []
for item in cand_ck_set: # 計算ck的支援度
sup = self.cal_sup(item)
if sup > self.minsup:
ck.append(item)
sup_ck.append(sup)
if len(ck) > 0:
self.fre_list.append(ck)
self.sup_list.append(sup_ck)
for ck, sup_ck in zip(self.fre_list, self.sup_list):
for itemset, sup in zip(ck, sup_ck):
self.fre_dict[tuple(itemset)] = sup
return
def ck_itemset(self, ind, ck_1):
# 根據k-1頻繁項集產生k頻繁項集, 產生候選然後減枝, 返回頻繁項的list
cand_ck_set = []
for i in range(len(ck_1)): # 合併兩個k-1頻繁項集
cand_ck = ck_1[i]
for j in range(i + 1, len(ck_1)):
if ck_1[i][:ind - 1] == ck_1[j][:ind - 1]: # 若前k-2項相同則合併
cand_ck.append(ck_1[j][-1]) # 合併形成頻繁k項
if self.prune(cand_ck, ck_1): # 檢查其他k-1項集是否為頻繁項集,進而減枝
cand_ck_set.append(cand_ck.copy())
cand_ck.pop()
return cand_ck_set
def prune(self, cand_ck_item, ck_1):
# 根據k-1頻繁項集來對k頻繁項是否頻繁
for item in cand_ck_item[:-2]:
sub_item = cand_ck_item.copy()
sub_item.remove(item)
if sub_item not in ck_1:
return False
return True
def cal_sup(self, item):
# 支援度計數
s = set(item)
sup = 0
for t in self.data:
if s.issubset(t):
sup += 1
return sup / self.N
def cal_conf(self, sxy, X):
# 計算置信度, sxy為產生規則的頻繁項集的支援度, X為規則前件
return sxy / self.fre_dict[tuple(X)]
def gen_rules(self):
# 從頻繁項集中提取規則
for i in range(1, len(self.fre_list)):
for ind, itemset in enumerate(self.fre_list[i]):
cand_rules = [] # 由該頻繁項集產生的規則的list, 記錄規則前件
sxy = self.sup_list[i][ind]
for item in itemset: # 初始化後件為1個項的規則
X = itemset.copy()
X.remove(item)
cand_rules.append(X)
while len(cand_rules) > 0:
itemset_rules = []
for X in cand_rules:
conf = self.cal_conf(sxy, X)
if conf > self.minconf:
itemset_rules.append(X)
Y = list(set(itemset) - set(X))
Y = sorted(Y)
self.rules_dict[(tuple(X), tuple(Y))] = conf
cand_rules = self.apriori_rules(itemset_rules)
return
def apriori_rules(self, itemset_rules):
# 根據先驗原理產生候選規則
cand_rules = []
for i in range(len(itemset_rules)):
for j in range(i + 1, len(itemset_rules)):
X = list(set(itemset_rules[i]) & set(itemset_rules[j])) # 合併生成新的規則前件
X = sorted(X)
if X in cand_rules or len(X) < 1: # 若該規則前件已經產生或者為空則跳過
continue
cand_rules.append(X)
return cand_rules
def fit(self, data):
self.init_param(data)
self.apriori_fre_itemset()
self.gen_rules()
return
if __name__ == '__main__':
data = [['l1', 'l2', 'l5'], ['l2', 'l4'], ['l2', 'l3'],
['l1', 'l2', 'l4'], ['l1', 'l3'], ['l2', 'l3'],
['l1', 'l3'], ['l1', 'l2', 'l3', 'l5'], ['l1', 'l2', 'l3']]
AP = Apriori(minsup=0.2, minconf=0.6)
AP.fit(data)
print(AP.rules_dict)
我的GitHub
注:如有不當之處,請指正。