1. 程式人生 > >分布式計算--(分布式+多進程+多線程+多協程)

分布式計算--(分布式+多進程+多線程+多協程)

隨機 wait 分布式計算 import 主函數 port 進程管理器 address ssg

先來個最簡單的例子:

把1-10000每個數求平方

服務器server:

用兩個隊列存儲任務、結果

定義兩個函數

要實現分布式得繼承multiprocessing.managers.BaseManager

在主函數裏multiprocessing.freeze_support()開啟分布式支持

註冊兩個函數給客戶端調用

創建管理器,設置ip地址和開啟端口、鏈接密碼。

用兩個隊列加任務、收結果。用剛剛註冊的函數

把1-10000壓入隊列,

把結果壓入隊列

最後完成關閉服務器

客戶端client:

也需要繼承multiprocessing.managers.BaseManager

定義一個協程處理一個數據,同時把結果壓入結果隊列

定義一個線程處理10個數據,開啟10個協程

定義一個進程,進程驅動10個線程

主函數:同客戶端註冊兩個函數

同客戶端創建管理器,設置ip地址和開啟端口、鏈接密碼。

鏈接服務器

同客戶端調用註冊的函數,兩個隊列

套四層循環:10個進程、100個線程、1000個協程

循環進程函數

上代碼:

服務器server:

#coding:utf-8
import multiprocessing  #分布式進程
import multiprocessing.managers #分布式進程管理器
import random,time  #隨機數,時間
import Queue #隊列

task_queue=Queue.Queue() #
任務 result_queue=Queue.Queue() #結果 def return_task(): #返回任務隊列 return task_queue def return_result(): #返回結果隊列 return result_queue class QueueManger(multiprocessing.managers.BaseManager):#繼承,進程管理共享數據 pass if __name__=="__main__": multiprocessing.freeze_support()#開啟分布式支持 QueueManger.register("
get_task",callable=return_task)#註冊函數給客戶端調用 QueueManger.register("get_result", callable=return_result) manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #創建一個管理器,設置地址與密碼 manger.start() #開啟 task,result=manger.get_task(),manger.get_result() #任務,結果 for i in range(10000): print "task add data",i task.put(i) print "waitting for------" for i in range(10000): res=result.get(timeout=100) print "get data",res manger.shutdown()#關閉服務器

客戶端client:

#coding:utf-8
import multiprocessing  #分布式進程
import multiprocessing.managers  # 分布式進程管理器
import random,time  #隨機數,時間
import Queue #隊列
import threading
import gevent
import gevent.monkey


class  QueueManger(multiprocessing.managers.BaseManager):# 繼承,進程管理共享數據
    pass
def  gevetygo(num ,result): #協程處理一個數據
    print num*num
    result.put(num*num)

def  threadgo(datalist,result): # 線程處理10個數據,開啟10個協程
    tasklist=[]
    for  data  in datalist:
        tasklist.append(gevent.spawn(gevetygo, data,result))
    gevent.joinall(tasklist)

def  processgo(ddatalist,result): # [[1,2,3],[4,5,6]] 進程驅動了10個線程
    threadlist=[]
    for  datalist in ddatalist:
        mythread=threading.Thread(target=threadgo,args=(datalist,result))
        mythread.start()
        threadlist.append(mythread)
    for mythread in threadlist:
        mythread.join()

if __name__=="__main__":
    QueueManger.register("get_task")  # 註冊函數調用服務器
    QueueManger.register("get_result")
    manger=QueueManger(address=("192.168.112.11",8848),authkey="123456")
    manger.connect()  # 鏈接服務器
    task= manger.get_task()
    result =manger.get_result()  # 任務,結果

    # 1000
    # 10個進程
    # 100個線程
    # 1000個協程

    for  i  in range(10):
        cubelist = []  # [[[1],[2]]]
        for j in range(10):
            arealist = []
            for k in range(10):
                linelist = []
                for l in range(10):
                    data = task.get()
                    linelist.append(data)
                arealist.append(linelist)
            cubelist.append(arealist)

        processlist = []
        for myarealist in cubelist:
            process = multiprocessing.Process(target=processgo, args=(myarealist, result))
            process.start()
            processlist.append(process)
        for process in processlist:
            process.join()

遇到的坑:一個月之前弄分布式的時候寫ip地址怎麽都開啟不了,後來換了臺電腦就支持了= =。

如果只是在自己電腦上弄的話,寫127.0.0.1也可以運行,如果你也遇到ip地址怎麽都開啟不了的情況

分布式計算--(分布式+多進程+多線程+多協程)