1. 程式人生 > >PythonI/O進階學習筆記_11.python的多程序

PythonI/O進階學習筆記_11.python的多程序

content: 1. 為什麼要多程序程式設計?和多執行緒有什麼區別? 2. python 多程序程式設計 3. 程序間通訊 =======================================   一. 為什麼要多程序程式設計?和多執行緒有什麼區別? 由於GIL的存在,所以對於某一些多執行緒任務來說,無法利用多核的優勢,對這些耗cpu的任務,用多程序反而能利用多cpu。 所以多cpu的操作用多程序程式設計。 對io操作較多的任務來說,瓶頸不在於cpu,更多的在於io的切換中的消耗和時間等待。用多執行緒反而能在io掛起的時候,進行執行緒切換。 雖然io操作多的時候,也可以用多程序程式設計,但是因為程序的切換系統的代價是十分大的,所以能使用多執行緒的情況下,儘量用多執行緒。   所以,對於耗費cpu的操作,比如計算、挖礦等,多程序優於多執行緒。 例:同計算一組斐波拉契數列的時間比較(耗cpu的操作)
#多執行緒
from concurrent.futures import  ThreadPoolExecutor,as_completed
from concurrent.futures import  ProcessPoolExecutor
import time
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1)+fib(n-2)
 
with ThreadPoolExecutor(3) as excutor:
    all_task=[excutor.submit(fib,(num)) for num in range(25,35)]
    start_time=time.time()
    for future in as_completed(all_task):
        data=future.result()
        print("result:{}".format(data))
    end_time=time.time()
    print("last time : {}".format(end_time-start_time))
 
#output:
result:75025
result:121393
result:196418
result:317811
result:514229
result:832040
result:1346269
result:2178309
result:3524578
result:5702887
last time : 98.66604399681091

 

#多程序
from concurrent.futures import  ThreadPoolExecutor,as_completed
from concurrent.futures import  ProcessPoolExecutor
import time
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1)+fib(n-2)
if __name__ == "__main__":
    with ProcessPoolExecutor(3) as excutor:
        all_task = [excutor.submit(fib, (num)) for num in range(25, 35)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("result:{}".format(data))
        end_time = time.time()
        print("last time : {}".format(end_time - start_time))
 
#output:
result:75025
result:121393
result:196418
result:317811
result:514229
result:832040
result:1346269
result:2178309
result:3524578
result:5702887
last time : 14.470988988876343

 

程序和執行緒的區別:
  • 程序是資源分配的最小單位,執行緒是程式執行的最小單位。
  • 程序有自己的獨立地址空間,每啟動一個程序,系統就會為它分配地址空間,建立資料表來維護程式碼段、堆疊段和資料段,這種操作非常昂貴。而執行緒是共享程序中的資料的,使用相同的地址空間,因此CPU切換一個執行緒的花費遠比程序要小很多,同時建立一個執行緒的開銷也比程序要小很多。
  • 執行緒之間的通訊更方便,同一程序下的執行緒共享全域性變數、靜態變數等資料,而程序之間的通訊需要以通訊的方式(IPC)進行。不過如何處理好同步與互斥是編寫多執行緒程式的難點。
  • 但是多程序程式更健壯,多執行緒程式只要有一個執行緒死掉,整個程序也死掉了,而一個程序死掉並不會對另外一個程序造成影響,因為程序有自己獨立的地址空間。
  二、python 多程序程式設計 1.from concurrent.futures import  ProcessPoolExecutor ProcessPoolExecutor 和上一章 講到的多執行緒的用法是一樣的。包括其中用到的Futures類。 基本看它的入口函式就明白,這裡不再贅述。   2.更加底層的multiprocessing 其實在ProcessPoolExecutor底層用的其實也是multiprocessing。 在multiprocess裡,有個Progress類。跟Thread用法又是相似的。
#input
from concurrent.futures import  ProcessPoolExecutor
import  multiprocessing
#多程序程式設計
import  time
def get_html(n):
    time.sleep(n)
    print("sub_progress sccess")
if __name__=="__main__":
    progress = multiprocessing.Process(target=get_html,args=(3,))
    print(progress.pid)
    progress.start()
    print(progress.pid)
    progress.join()
 
#output
None
12864
sub_progress sccess

 

3.繼承Progress類(與之前的Thread類一樣)
import  multiprocessing
#多程序程式設計
import  time
 
class progress_get_html(multiprocessing.Process):
    def __init__(self,n):
        self.n=n
        super().__init__()
    def run(self):
        time.sleep(self.n)
        print("sub progress success")
class MyProgress(multiprocessing.Process):
    def __init__(self,n):
        self.n=n
        super().__init__()
    def run(self):
        pro=progress_get_html(self.n)
        pro.start()
        print("progress end")
if __name__=="__main__":
    progress = MyProgress(3)
    print(progress.pid)
    progress.start()
    print(progress.pid)
    progress.join()
 
#output:
None
8744
progress end
sub progress success

 

4.使用程序池 指明程序數,不指明的話,可以直接預設為cpu數(cpu_count() or 1)。
from concurrent.futures import  ProcessPoolExecutor
from multiprocessing import  pool
import  multiprocessing
#多程序程式設計
import  time
def get_html(n):
    time.sleep(n)
    print("sub_progress sccess")
    return n
if __name__=="__main__":
    pool=multiprocessing.Pool(multiprocessing.cpu_count())
    result=pool.apply_async(get_html,args=(3,))
    print(result.get())
    #pool在呼叫join之前 需要呼叫close 來讓它不再接收任務。否則會報錯
    pool.close()
    pool.join()
    print(result.get())
 
#output
sub_progress sccess
3
3

 

其他方法: - imap:按照引數輸入順序
if __name__=="__main__":
    pool=multiprocessing.Pool(multiprocessing.cpu_count())
    for result in pool.imap(get_html,[1,5,3]):
        print("sleep {} successed ".format(result))
 
#output:
sub_progress sccess
sleep 1 successed
sub_progress sccess
sub_progress sccess
sleep 5 successed
sleep 3 successed
imap_unordered: 按照執行完成順序
if __name__=="__main__":
    pool=multiprocessing.Pool(multiprocessing.cpu_count())
 
    #for result in pool.imap(get_html,[1,5,3]):
    #    print("sleep {} successed ".format(result))
    for result in pool.imap_unordered(get_html,[1,5,3]):
        print("sleep {} successed ".format(result))
 
#output:
sub_progress sccess
sleep 1 successed
sub_progress sccess
sleep 3 successed
sub_progress sccess
sleep 5 successed

 

三. 程序間通訊 與執行緒間不同的是,執行緒間同步的類和鎖是不可用的。 1.Queue(注意是multiprocessing而不是thread的)
from multiprocessing import Process,Queue
import  time
def producer(queue):
    queue.put("a")
    time.sleep(2)
def consumer(queue):
    time.sleep(2)
    data=queue.get()
    print(data)
if __name__== "__main__":
    queue=Queue(10)
    my_producer = Process(target=producer,args=(queue,))
    my_consumer = Process(target=consumer,args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
 
#outpu:
a
注意:multprocess中的Queue是不能用於pool程序池的   2.Manager(與程序池共用) Manager中有個Queue,如果像實現pool中的程序間通訊,需要使用Manager中的Queue。
from multiprocessing import Process,pool,Manager,Pool
import  time
def producer(queue):
    queue.put("a")
    time.sleep(2)
def consumer(queue):
    time.sleep(2)
    data=queue.get()
    print(data)
if __name__== "__main__":
    queue=Manager().Queue()
    pool=Pool(3)
    pool.apply_async(producer,args=(queue,))
    pool.apply_async(consumer,args=(queue,))
    pool.close()
    pool.join()
 
#output:
a
  3.管道pipe pipe只能適用於兩個指定的程序。 pipe的效能高於queue的,queue加了很多的鎖操作。
from multiprocessing import Process,pool,Manager,Pool,Pipe
import  time
def producer(pipe):
    pipe.send("hello")
def consumer(pipe):
    print(pipe.recv())
if __name__== "__main__":
    recv_pipe,send_pipe=Pipe()
    my_producer=Process(target=producer,args=(send_pipe,))
    my_consumer=Process(target=consumer,args=(recv_pipe,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()
 
#output:
hello

 

4.程序間共享記憶體操作 Mnager的dict、list、value等。
from multiprocessing import Process,pool,Manager,Pool,Pipe
import  time
 
def add_data(p_dict,key,value):
    p_dict[key]=value
if __name__ == "__main__":
    progress_dict= Manager().dict()
    first_progress= Process(target=add_data,args=(progress_dict,"name","tangrong"))
    second_progress = Process(target=add_data,args=(progress_dict,"age","18"))
    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()
    print(progress_dict)
 
#output:
{'name': 'tangrong', 'age': '18'}
在使用的時候,可以用Manager中的資料結構,但是注意資料同步(LOCK,RLOCK等)