1. 程式人生 > >python全棧開發day33-進程間的通信、進程間的數據共享,進程池

python全棧開發day33-進程間的通信、進程間的數據共享,進程池

NPU art 標誌位 應該 自然 fin roc 錯亂 app

一、昨日內容回顧:

    1. 守護進程

        1)、p.saemon,

        2 )、p.terminate

        3 )、p.join

    2. 同步控制

      1)、鎖,Lock

        互斥鎖,解決數據安全、進程之間資源搶占問題。

      2)、信號量,Semaphore

        鎖+計數器

      3)、事件,Event

        通過一個標誌位flag來控制進程的阻塞和執行。

    3. 多進程實現tcp協議的socket的sever端

        1)子進程中不能使用input

        2)允許端口的重用設置

        3)妥善處理sk的close確保操作系統的資源能夠被及時回收。

        

技術分享圖片
import socket
from multiprocessing import Process


def func(conn):
    conn.send(bhello)
    data = conn.recv(1024)
    print(data.decode(utf-8))
    conn.close()


if __name__ == __main__:
    sk = socket.socket()
    sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 
1) sk.bind((127.0.0.1, 9000)) sk.listen(5) try: while True: con, addr = sk.accept() p = Process(target=func, args=(con,)) p.start() finally: sk.close()
server 技術分享圖片
import socket

sk = socket.socket()
sk.connect((127.0.0.1, 9000))
data = sk.recv(1024)
print(data) msg = input(>>>).encode(utf-8) sk.send(msg)
client

二、今日內容總結:

    1、進程間的通信:

        1)、隊列 Queue:隊列是加鎖的,在多進程之間對數據的管理是安全的

            維護了一個先進先出的順序,且保證了數據在進程之間是安全的。

            put,get,full,empty,get_nowait,put_nowait

          生產者和消費者模型:

            (1)、解決生產消費供需關系,生產的東西不夠吃,就再開啟一個進程生產。。。

             (2)、解決消費者不能結束消費完物品的循環和阻塞問題,隊列中引入None,

                讓消費者再取的時候判斷是否遇到None,遇到則結束。有幾個消費者就隊列中就put幾個None

             (3)、解決生產者生產完成後主程序才結束問題,對生產者的進程進程join阻塞

          JoinableQueue:

            join和task_done方法:

            join會阻塞隊列,直至隊列中的數據被取完,且執行了一個task_done,程序才會繼續執行。

技術分享圖片
from multiprocessing import Process, Queue
import time, random


def consumer(name, q):
    while True:
        time.sleep(random.randint(1, 3))
        food = q.get()
        if food is None: break
        print(%s吃了%s % (name, food))


def producer(name, food, q):
    for i in range(10):
        time.sleep(random.randint(1, 5))
        q.put(%s生產了%s%s % (name, food, i))
        print(%s生產了數據%s%s % (name, food, i))



if __name__ == __main__:
    q = Queue()
    p1 = Process(target=producer, args=(egon, 面包, q))
    p2 = Process(target=producer, args=(taibai, 骨頭, q))
    p1.start()
    p2.start()
    c1 = Process(target=consumer, args=(alex, q))
    c2 = Process(target=consumer, args=(firedragon, q))
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
生產者和消費者模型例子None 技術分享圖片
from multiprocessing import JoinableQueue,Process
import time


def consumer(name,q):
    while True:
        obj = q.get()
        time.sleep(0.3)
        print(%s吃了一個%s % (name, obj))
        q.task_done()


if __name__ == __main__:
    q = JoinableQueue()
    for i in range(10):
        q.put(food%s % i)
    p1 = Process(target=consumer, args=(alex, q))
    p1.daemon = True
    p1.start()
    q.join()    # 阻塞隊列,直至隊列的數據被取完,且執行了一個task_done()
                # p1為守護進程,主程序代碼執行完畢後,守護進程隨之結束,裏邊的循環自然也結束了。
生產者和消費者模型JoinableQueue

          技術分享圖片

        2)、管道 Pipe:底層實現是pickle,對數據的管理是不安全的,隊列的實現機制就是管道+鎖

            雙向通信:利用pickle實現的

            收不到,就阻塞

          # 管道的EOFError是怎麽報出來的(同時關閉主進程的lp和子進程的lp就會報出EOFError)
          # 管道在數據管理上是不安全的
          # 隊列的實現機制 就是 管道+鎖

            技術分享圖片

技術分享圖片
from multiprocessing import Pipe,Process
# lp,rp = Pipe()
# lp.send(‘hello‘)
# print(rp.recv())
# # print(rp.recv())    # 沒有數據在此阻塞進程
# # rp.send()   # 不能發送空數據
# lp.send([1,2,3])
# print(rp.recv())


def consumer(lp,rp):
    lp.close()   #1.這個發關閉
    while True:
        print(rp.recv())


if __name__ == __main__:
    lp, rp = Pipe()
    Process(target=consumer, args=(lp, rp)).start()
    Process(target=consumer, args=(lp, rp)).start()
    Process(target=consumer, args=(lp, rp)).start()
    Process(target=consumer, args=(lp, rp)).start()
    #rp.close()
    for i in range(100):
        lp.send(food%i % i)
    lp.close()  #2.這個關閉 這兩關閉才會報錯EOFError
管道例子

    2、進程之間的數據共享,Manager

         Manager創建的數據(如字典等)可以在進程之間共享,涉及數據操作要加上鎖,不然會出現數據錯亂。

         m = Manager() dic = m.dict({‘count’:100})

         with Manager() as m: dic = m.dict({‘count’:100}) 但涉及dic的操作代碼必須在with的縮進執行

        

技術分享圖片
from multiprocessing import Lock,Manager,Process


def func(dic_tmp, lock_tmp):
    with lock_tmp:
        dic_tmp[count] -= 1


if __name__ == __main__:
    lock = Lock()
    with Manager() as m:
        dic = m.dict({count: 50})
        p_lst = []
        for i in range(50):
            p = Process(target=func, args=(dic, lock))
            p.start()
            p_lst.append(p)
        for i in p_lst:
            i.join()
        print(dic)
50個進程同時操作一個字典的例子

    3、進程池

       進程池使用場景PK多進程: 技術分享圖片

        1.對於純計算的代碼,使用進程池更好(個人理解,高效利用cpu沒有了節省了進程的開啟和回收時間,也節省操作系統調度進程切換的時間)

        2.對於高IO的代碼,沒有更好選擇的情況下使用多進程。

        總結:使用進程池比起多進程,節省了開啟進程回收進程資源的時間,給操作系統調度進程降低了難度。

       進程池apply(同步)添加入池方法和apply_async(異步)

       使用進程池提交任務方法:

          p = Pool(5)

          p.appy(func=***,args=(,)))   #同步提交任務 沒有多進程的優勢

          p.apply_async(func=***,args=(,))  #異步提交任務

          p.close()   #關閉進程池,阻止向進程池添加新的任務

          p.join()  #依賴close,進程池必須先close後join(個人理解應該是要阻塞執行完進程池的任務,才進入非阻塞狀態)

         ------代碼-------

        

技術分享圖片
from multiprocessing import Pool,Process
import time,random


def wahaha(num):
    time.sleep(random.randint(1,3))
    print(num:%s % num**num)


if __name__ == __main__:
    
    # -------------------------適合高計算--------------------------------------------
    p = Pool(5)
    # start = time.time()
    # for i in range(100):
    #     p.apply_async(func=wahaha,args=(i,))
    #
    # p.close()
    # p.join()
    # print(time.time()-start)

    start = time.time()
    p.map(func=wahaha,iterable=range(101))
    print(time.time()-start)
# -------------------------適合高IO--------------------------------------------
    # start = time.time()
    # p_lst = []
    # for i in range(101):
    #     p = Process(target=wahaha,args=(i,))
    #     p.start()
    #     p_lst.append(p)
    # for i in p_lst:
    #     i.join()
    # print(time.time() - start)
View Code

       使用map添加任務的方法以及它和普通(apply_async)方法的區別:

          p.map(func=***,iterable=range(101))

          優點:就是一個任務函數,個一個itetable,節省了for循環和close,join,是一種簡便寫法。

          區別:apply_async和map相比,操作復雜,但是可以通過get方法獲取返回值,而map不行。

        

技術分享圖片
def wahaha(num):
    print(num)
    return num**


if __name__ == __main__:

    p = Pool(5)
    start = time.time()
    result_lst = []
    for i in range(100):
        res = p.apply_async(func=wahaha,args=(i,))
        result_lst.append(res)
    print(result_lst)
    for j in result_lst:print(j.get())
    p.close()
    p.join()
    print(time.time()-start)
apply_async使用get獲取返回值

       回調函數:可以接收func函數的返回值。但callback函數在主進程中運行。

         p.apply_async(func=***,args=(*,),callback=回調函數))

        

技術分享圖片
from multiprocessing import Pool,Process
import os


def wahaha(num):
    print(子進程:,os.getpid())
    return num**num

def callb(argv):
    print(os.getpid())
    print(argv)


if __name__ == __main__:
    print(主進程, os.getpid())
    p = Pool(5)
    p.apply_async(func=wahaha,args=(1,),callback=callb)
    p.close()
    p.join()
callback

三、預習和擴展:

python全棧開發day33-進程間的通信、進程間的數據共享,進程池