1. 程式人生 > >並發編程 - 線程 - 1.線程queue/2.線程池進程池/3.異步調用與回調機制

並發編程 - 線程 - 1.線程queue/2.線程池進程池/3.異步調用與回調機制

cal 編程 機制 com size ssp .org don 結果

1.線程queue :會有鎖
q=queue.Queue(3)
q.get()
q.put()

先進先出 隊列
後進先出 堆棧
優先級隊列
 1 """先進先出 隊列"""
 2 import queue
 3 q=queue.Queue(3) #先進先出->隊列
 4 
 5 q.put(first)
 6 q.put(2)
 7 # q.put(‘third‘)
 8 # q.put(4)
 9 q.put(4,block=False) #q.put_nowait(4)
10 # q.put_nowait(4)
11 # q.put(4,block=True)  # True 阻塞 False 不阻塞 直接告訴你 隊列滿了
12 # q.put(4,block=True,timeout=3) # 阻塞等待3秒 還沒有拿走數據就拋異常 13 # 14 print(q.get()) 15 print(q.get()) 16 print(q.get()) 17 print(q.get(block=True,timeout=2)) # false 不阻塞沒有數據就拋異常 默認是阻塞 block=True 18 print(q.get_nowait()) # 相當於block=false 19 # def get(self, block=True, timeout=None): 20 21 22 """後進先出 堆棧
""" 23 import queue 24 q=queue.LifoQueue(3) #後進先出->堆棧 25 q.put(first) 26 q.put(2) 27 q.put(third) 28 29 print(q.get()) 30 print(q.get()) 31 print(q.get()) 32 33 """優先級隊列 """ 34 import queue 35 q=queue.PriorityQueue(3) #優先級隊列 36 37 q.put((10,{alice:12})) # 數字越小 優先級越高 優先拿出來 38 q.put((40,two
)) 39 q.put((30,three)) 40 41 print(q.get()) 42 print(q.get()) 43 print(q.get())
2.線程池進程池:
client server 是IO 操作應該用多線程
計算密集型: 用多進程
io密集型:用多線程

池:對數目加以限制,保證機器正常運行
 1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 2 import os,time,random
 3 
 4 def task(name):
 5     print(name:%s pid:%s run %(name,os.getpid()))
 6     time.sleep(random.randint(1,3))
 7 
 8 
 9 if __name__ == __main__:
10     pool=ProcessPoolExecutor(4)  # 不指定 默認是cpu的核數
11     # pool=ThreadPoolExecutor(5)
12 
13     for i in range(10):
14         pool.submit(task,egon%s %i)  # 異步調用池子收了10個任務,但同一時間只有4個任務在進行
15 
16     pool.shutdown(wait=True)  # 類似join  代表往池子裏面丟任務的入口封死了 計數器-1
17 
18 
19     print()
20 """
21 主                         # # 異步調用池子收了10個任務,但同一時間只有4個任務在進行
22 name:egon0 pid:60056 run     # 只有4個pid
23 name:egon1 pid:64700 run
24 name:egon2 pid:59940 run
25 name:egon3 pid:60888 run
26 
27 name:egon4 pid:60888 run
28 
29 name:egon5 pid:60056 run
30 name:egon6 pid:60888 run
31 
32 name:egon7 pid:60056 run
33 name:egon8 pid:64700 run
34 name:egon9 pid:59940 run 
35 """
36 # pool.shutdown(wait=True) # 代表往池子裏面丟任務的入口封死了 計數器-1
37 """
38 name:egon0 pid:57124 run
39 name:egon1 pid:62252 run
40 name:egon2 pid:55736 run
41 name:egon3 pid:62060 run
42 name:egon4 pid:57124 run
43 name:egon5 pid:62252 run
44 name:egon6 pid:55736 run
45 name:egon7 pid:55736 run
46 name:egon8 pid:62060 run
47 name:egon9 pid:55736 run
48 49 """
50 
51 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
52 from threading import currentThread
53 import os,time,random
54 
55 def task():
56     print(name:%s pid:%s run %(currentThread().getName(),os.getpid()))
57     time.sleep(random.randint(1,3))
58 
59 
60 if __name__ == __main__:
61     pool=ThreadPoolExecutor(5)
62 
63     for i in range(10):
64         pool.submit(task)
65 
66     pool.shutdown(wait=True)
67 
68 
69     print()
70 """
71 name:ThreadPoolExecutor-0_0 pid:61508 run
72 name:ThreadPoolExecutor-0_1 pid:61508 run
73 name:ThreadPoolExecutor-0_2 pid:61508 run
74 name:ThreadPoolExecutor-0_3 pid:61508 run
75 name:ThreadPoolExecutor-0_4 pid:61508 run
76 name:ThreadPoolExecutor-0_2 pid:61508 run
77 name:ThreadPoolExecutor-0_4 pid:61508 run
78 name:ThreadPoolExecutor-0_0 pid:61508 run
79 name:ThreadPoolExecutor-0_3 pid:61508 run
80 name:ThreadPoolExecutor-0_1 pid:61508 run
81 82 """
3.異步調用與回調機制:
提交任務的兩種方式:
同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行,效率低
異步調用:提交完任務後,不等待任務執行完畢。異步調用+回調機制 自動觸發叫回調
 1 """同步調用"""
 2 from concurrent.futures import ThreadPoolExecutor
 3 import time
 4 import random
 5 
 6 def la(name):
 7     print(%s is laing %name)
 8     time.sleep(random.randint(3,5))
 9     res=random.randint(7,13)*#
10     return {name:name,res:res}
11 
12 def weigh(shit):
13     name=shit[name]
14     size=len(shit[res])
15     print(%s 拉了 《%s》kg %(name,size))
16 
17 
18 if __name__ == __main__:
19     pool=ThreadPoolExecutor(13)
20 
21     shit1=pool.submit(la,alex).result()
22     weigh(shit1)
23 
24     shit2=pool.submit(la,wupeiqi).result()
25     weigh(shit2)
26 
27     shit3=pool.submit(la,yuanhao).result()
28     weigh(shit3)
29 
30 
31 """異步調用 + 回調機制  自動觸發叫回調"""
32 from concurrent.futures import ThreadPoolExecutor
33 import time
34 import random
35 
36 def la(name):
37     print(%s is laing %name)
38     time.sleep(random.randint(3,5))
39     res=random.randint(7,13)*#
40     return {name:name,res:res}
41     # weigh({‘name‘:name,‘res‘:res})  # 這樣寫不好  所有功能 寫在一起了
42 
43 
44 def weigh(shit):
45     shit=shit.result()  # 拿到是 對象 需要result()
46     name=shit[name]
47     size=len(shit[res])
48     print(%s 拉了 《%s》kg %(name,size))
49 
50 
51 if __name__ == __main__:
52     pool=ThreadPoolExecutor(13)
53 
54     # pool.submit(la, ‘alex‘)
55     # pool.submit(la, ‘wupeiqi‘)
56     # pool.submit(la, ‘yuanhao‘)
57 
58     pool.submit(la,alex).add_done_callback(weigh) # 實現了程序的解耦合
59     pool.submit(la,wupeiqi).add_done_callback(weigh)
60     pool.submit(la,yuanhao).add_done_callback(weigh)
4.異步調用與回調機制應用:
pip3 install requests
requests

異步調用+回調機制的 應用場景:
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):   # io操作  基於線程 數目有限 用線程池
    print(GET %s %url)
    response=requests.get(url)
    time.sleep(3)
    return {url:url,content:response.text}


def parse(res):
    res=res.result()
    print(%s parse res is %s %(res[url],len(res[content])))


if __name__ == __main__:
    urls=[
        http://www.cnblogs.com/linhaifeng,
        https://www.python.org,
        https://www.openstack.org,
    ]

    pool=ThreadPoolExecutor(2)

    for url in urls:
        pool.submit(get,url).add_done_callback(parse)

並發編程 - 線程 - 1.線程queue/2.線程池進程池/3.異步調用與回調機制