1. 程式人生 > >pyspark實現Apriori演算法、迴圈迭代、並行處理

pyspark實現Apriori演算法、迴圈迭代、並行處理

from pyspark import  SparkContext
myDat=[ [ 1, 3, 4,5 ], [ 2, 3, 5 ], [ 1, 2, 3,4, 5 ], [ 2,3,4, 5 ] ]
sc = SparkContext( 'local', 'pyspark')
myDat=sc.parallelize(myDat) #得到輸入資料RDD #myDat.collect(): [[1, 3, 4, 5], [2, 3, 5], [1, 2, 3, 4, 5], [2, 3, 4, 5]]
C1=myDat.flatMap(lambda x: set(x)).distinct().collect() #distinct()是去重操作,對應C1=createC1(myDat) #得到1項集 #[1, 2, 3, 4, 5],
C1=[frozenset([var]) for var in C1] #需要這樣做,因為python的程式碼裡需要處理集合操作
D=myDat.map(lambda x: set(x)).collect() #將輸入資料RDD轉化為set的列表 #[{1, 3, 4, 5}, {2, 3, 5}, {1, 2, 3, 4, 5}, {2, 3, 4, 5}]
D_bc=sc.broadcast(D)
length=len(myDat.collect())
suppData=sc.parallelize(C1).map(lambda x: (x,len([var for var in D_bc.value if x.issubset(var)])/length) if len([var for var in D_bc.value \
        if x.issubset(var)])/length >=0.75 else ()).filter(lambda x: x).collect()
L=[]
L1=[frozenset(var) for var in map(lambda x:x[0],suppData)] #篩選出大於最小支援度
L.append(L1)
k=2
#D_bc=sc.broadcast(D)
while (len(L[k-2])>0):
    Ck=[var1|var2 for index,var1 in enumerate(L[k-2]) for var2 in L[k-2][index+1:] if list(var1)[:k-2]==list(var2)[:k-2]]
    #count_each_ele=myDat.flatMap(lambda x:x).map(lambda x: (x,1)).countByKey()
    #count_each_ele=sc.parallelize(Ck).map(lambda x: filter(lambda y: x.issubset(y),D_bc.value))
    suppData_temp=sc.parallelize(Ck).map(lambda x: (x,len([var for var in D_bc.value if x.issubset(var)])/length) if len([var for var in D_bc.value \
        if x.issubset(var)])/length >=0.75 else ()).filter(lambda x: x).collect()
    #Ck中的多個子集會分佈到多個分佈的機器的任務中執行,D_bc是D的分發共享變數,在每個任務中,都可以使用D_bc來統計本任務中包含某子集的個數
    suppData+=suppData_temp
    L.append([var[0] for var in suppData_temp]) #使用這行程式碼,最後跳出while後再過濾一下空的項
    k+=1
L=[var for var in L if var]
print(L)
print(suppData)
def calcConf(freqSet, H, supportData, brl, minConf=0.7 ):
    prunedH=[]
    #sc.parallelize(H).map(lambda x: ...) #這裡也無法並行,因為,freqSet是區域性的,如果弄成廣播,那得好多副本
    for conseq in H:
        conf = supportData[ freqSet ] / supportData[ freqSet - conseq ]
        if conf >= minConf:
            print(freqSet - conseq, '-->', conseq, 'conf:', conf)
            brl.append( ( freqSet - conseq, conseq, conf ) )
            prunedH.append( conseq )
    return prunedH
def rulesFromConseq(freqSet,H,supportData,brl,minConf=0.7):
    m=len(H[0])
    if len(freqSet)>m+1:
        Hmp1=[var1|var2 for index,var1 in enumerate(H) for var2 in H[index+1:] if list(var1)[:m+1-2]==list(var2)[:m+1-2]]
        Hmp1 = calcConf( freqSet, Hmp1, supportData, brl, minConf )
        if len( Hmp1 ) > 1:
            rulesFromConseq( freqSet, Hmp1, supportData, brl, minConf )
def generateRules( L, supportData, minConf=0.7 ):
    bigRuleList = []
    for i in range( 1, len( L ) ):
        for freqSet in L[ i ]:
            H1 = [ frozenset( [ item ] ) for item in freqSet ]
            if i > 1:
                rulesFromConseq( freqSet, H1, supportData, bigRuleList, minConf )
            else:
                calcConf( freqSet, H1, supportData, bigRuleList, minConf )
    return bigRuleList
suppData_dict={}
suppData_dict.update(suppData) #查字典型別的update用法
sD_bc=sc.broadcast(suppData_dict)
rules = generateRules( L, sD_bc.value, minConf=0.9 )
print('rules:\n', rules)

進一步優化,將計算rules的部分的程式碼寫成如下形式:

#上述計算rules的程式碼的進一步修剪
newL=[[x,[]] for x in sc.parallelize(L).flatMap(lambda x: x).collect() if len(x)>1]
suppData_dict={}
suppData_dict.update(suppData)
sD_bc=sc.broadcast(suppData_dict) #查字典型別的update用法
def f2(freqSet, H, supportData, minConf=0.7 ):
    prunedH=[]
    for conseq in H:
        conf = supportData[ freqSet[0] ] / supportData[ freqSet[0] - conseq ]
        if conf >= minConf:
            #print(freqSet[0] - conseq, '-->', conseq, 'conf:', conf)
            freqSet[1]=freqSet[1]+[( freqSet[0] - conseq, conseq, conf )]
            prunedH.append( conseq )
    return (prunedH,freqSet)
def f1(freqSet,H,supportData,minConf=0.7): #需要這個H,因為H並不一定都由freqSet面來
    m=len(H[0])
    if len(freqSet[0])>m+1:
        Hmp1=[var1|var2 for index,var1 in enumerate(H) for var2 in H[index+1:] if list(var1)[:m+1-2]==list(var2)[:m+1-2]]
        Hmp1 = f2( freqSet, Hmp1, supportData, minConf )
        if len( Hmp1[0] ) > 1:
            f1( freqSet, Hmp1[0], supportData, minConf )
        return Hmp1[1]
result=sc.parallelize(newL).map(lambda x: f1(x,[frozenset([var]) for var in x[0]],sD_bc.value,0.9) if len(x[0])>2 else f2(x,[frozenset([var]) for var in x[0]],sD_bc.value,0.9)[1]).collect()
rules=[var[1] for var in result]
print(rules)

相關推薦

pyspark實現Apriori演算法迴圈並行處理

from pyspark import SparkContext myDat=[ [ 1, 3, 4,5 ], [ 2, 3, 5 ], [ 1, 2, 3,4, 5 ], [ 2,3,4, 5 ] ] sc = SparkContext( 'local', 'pyspa

生成器

生成器: 將[]改為() G = (i*I for I in range(8)) 生成器是一個物件不能直接列印 通過遍歷得到生成器的資料          For I in g: &nb

Python連載38-協程生產者消費者模型

一、生產者消費者模型 import multiprocessing from time import ctime def consumer(input_q): print("Into consumer:",ctime()) while True: #處理項

實戰c++中的vector系列--vector的遍歷(stl演算法vector器(不要在迴圈中判斷不等於end())operator[])【轉】

(轉自:https://blog.csdn.net/wangshubo1989/article/details/50374914?utm_source=blogxgwz29) 遍歷一個vector容器有很多種方法,使用起來也是仁者見仁。 通過索引遍歷: for (i = 0; i<

演算法】遞迴迴圈

一、遞迴 (一)介紹 1. 遞迴是 一個過程或函式在其定義或說明中有直接或間接呼叫自身的一種方法,它通常把一個大型複雜的問題層層轉化為一個與原問題相似的規模較小的問題來求解,遞迴策略只需少量的程式就可描述出解題過程所需要的多次重複計算,大大地減少了程式的程式碼量。遞迴的能

python學習(十七)——補充內建函式使用器協議實現斐波那契數列描述符pycharm的問題

一、補充內建函式 #--------------------------isinstance/isinbclass-------------- class Foo: pass class Bar(Foo): pass b1=Bar() print(isinstance(b1,

集合Iterator增強for迴圈泛型List介面Set介面

1、集合: 集合是java中提供的一種容器,可以用來儲存多個數據。 集合和陣列的區別:陣列的長度是固定的。集合的長度是可變的。集合中儲存的元素必須是引用型別資料 (1)ArrayList 集合儲存元素並遍歷 練習一:ArrayList集合儲存5個int型別元素 import java.util.Array

(Java)集合框架(一)Collection介面方法Iterator增強for迴圈

【Collection介面】  import java.util.ArrayList; import java.util.Collection; /* * Collection介面中的方法 是集合中所有實現類必須擁有的方法 * 程式演示,使用Collection

迴圈和遍歷的區別

迴圈(loop) - 最基礎的概念, 所有重複的行為 遞迴(recursion) - 在函式內呼叫自身, 將複雜情況逐步轉化成基本情況 (數學)迭代(iterate) - 在多次迴圈中逐步接近結果 (

C++器的兩種實現方式 (Range for和C#Java中的foreach)

一、迭代器概述   這個標題其實有點“標題黨”的含義,因為C++在標準庫中的實現迭代器的方式只有一種,也就是為類定義begin()和end()函式,C++11增加了range for語句,可以用來遍歷迭代器中的元素。實現迭代器的第二種方式,就是用C++模擬C#和Java中的

sqrt函式實現——二分法牛頓

在leetcode練習時,碰到一道經典的面試題,如何實現sqrt()開平方函式。當然,很簡單的是呼叫系統函式,但是難道不能自己實現這個函式的功能嗎?於是一番思索和查閱資料,看到下面的方法。 二分法求解 二分法這個應該很熟悉,在二分查詢演算法中就有具體的體現。應用

JAVA8學習(一)ListMap的迴圈

學習怎樣用java8去迴圈迭代 1、Map Map<Integer, String> items = new HashMap<>(); items.put(1, "wangw

python基礎知識梳理----3基本資料型別,int,bool,str ,for 迴圈,

一:python的基本型別 1.int  -----整數,主要進行數學運算 2.str  -----字串,可以儲存少量資料,並進行相關操作, 3. bool ---布林型別,判斷真假 4.list ----列表.儲存大量的資料 用[ ]表示 5.tuple ----元組,不可以發

MATLAB入門學習-#6-JacobiGauss-SeidelSOR法程式設計練習

MATLAB入門學習-#6-Jacobi、Gauss-Seidel、SOR迭代法程式設計練習 1.Jacobi迭代法 2.Gauss-Seidel迭代法 3.SOR迭代法(鬆弛法) 這三種迭代法是在數值分析課程裡學到的,都是求解線性

搞清楚 Python 的物件生成器

很多夥伴對 Python 的迭代器、可迭代物件、生成器這幾個概念有點搞不清楚,我來說說我的理解,希望對需要的朋友有所幫助。 1 迭代器協議 迭代器協議是核心,搞懂了這個,上面的幾個概念也就很好理解了。 所謂迭代器協議,就是要求一個迭代器必須要實現如下兩個方法 iterator.__iter__(

實現Apriori演算法(python)

1 # coding: utf-8 2 3 # 利用python實現apriori演算法 4 5 # In[1]: 6 7 8 #匯入需要的庫 9 from numpy import * 10 11 12 # In[2]: 13 14 15

PHP實現Apriori演算法——計算置信度

強規則定義 對於一個頻繁集L,找到所有的L的非空子集非空子集f,如果f -> L - f,的概率滿足最小置信度,則這是一個強規則。 如果{A,B,C,D}是一個頻繁集,則它有如下候選規則 ABC -> D, ABD -> C, ACD -> B, BCD -&g

PHP實現Apriori演算法——計算支援度

Apriori演算法是資料探勘中十分經典的演算法,在關聯度的計算中會經常用到,通過實現來更好的理解Apriori演算法。 Apriori定律 1:如果一個集合是頻繁項集,則它的所有子集都是頻繁項集。舉例:假設一個集合{A,B}是頻繁項集,即A、B同時出現在一條記錄的次數大於等於最小支援度

物件生成器

  迭代器與可迭代物件 1、定義: 可迭代物件:大部分容器如 list,truples,str,sets是可迭代物件,但是他們不是迭代器。可迭代物件實現了__iter__方法,返回一個迭代器,或者使用iter(“可迭代物件”)返回一個迭代器。   迭代器:迭代器提供了一

java中Iterable介面的使用,實現一個單鏈表的

單鏈表實現: public class MyLinkedList <T>{ private static class Entry<E>{ private E value; private Entry next;