1. 程式人生 > >並發編程---線程queue---進程池線程池---異部調用(回調機制)

並發編程---線程queue---進程池線程池---異部調用(回調機制)

priority close name port rand current 結果 耦合 join

線程

  • 隊列:先進先出
  • 堆棧:後進先出
  • 優先級:數字越小優先級越大,越先輸出
技術分享圖片
import queue

q = queue.Queue(3) # 先進先出-->隊列

q.put(first)
q.put(2)
# q.put(‘third‘)
# q.put(4) #由於沒有人取走,就會卡主
q.put(4,block=False)  #等同於q.get_nowait(), Ture 阻塞,Flase不阻塞,報異常滿了
# # q.put(4,block=True,timeout=3)

print(q.get())
print(q.get())
print(q.get())
print(q.get(block=True,timeout=3)) # 阻塞等待3秒 沒有取走數據就報異常 # print(q.get(block=False)) #等同於q.get_nowait() # print(q.get_nowait()) q = queue.LifoQueue(3) #後進先出-->堆棧 q.put(first) q.put(2) q.put(third) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 打印結果: third 2 first ‘‘‘ q = queue.PriorityQueue(3) #
優先級隊列 q.put((10,one)) q.put((40,two)) q.put((30,three)) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 數字越小優先級越高 打印結果 (10, ‘one‘) (30, ‘three‘) (40, ‘two‘) ‘‘‘
線程queue

進程池線程池

  • 池:是用來對進程(線程)的數量加以限制
  • 進程池:計算密集型,用多進程
  • 線程池:IO密集型,用多線程,例如:sockect網絡通信就應該用多線程
技術分享圖片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random ‘‘‘ sockect網絡通信是IO操作,所以用多線程 計算密集型:用多進程 ‘‘‘ def task(name): print(name:%s pid:%s run %(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == __main__: # pool = ProcessPoolExecutor(4) # 進程池最多裝4個進程,不指定的話默認是cpu的核數 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task,yang%s %i) # 異步調用池子收了10個任務,但同一時間只有4個任務在進行 pool.shutdown(wait=True) # 類似join 代表往池子裏面丟任務的入口關掉 計數器-1 print() ‘‘‘ 打印結果: name:yang0 pid:11120 run name:yang1 pid:11120 run name:yang2 pid:11120 run name:yang3 pid:11120 run name:yang4 pid:11120 run name:yang5 pid:11120 run name:yang6 pid:11120 run name:yang7 pid:11120 run name:yang8 pid:11120 run name:yang9 pid:11120 run 主 ‘‘‘ from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread import os,time,random def task(): print(name:%s pid:%s run %(currentThread().getName(),os.getpid())) time.sleep(random.randint(1,3)) if __name__ == __main__: # pool = ProcessPoolExecutor(4) # 進程池最多裝4個進程,不指定的話默認是cpu的核數 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task) # 異步調用池子收了10個任務,但同一時間只有4個任務在進行 pool.shutdown(wait=True) # 類似join 代表往池子裏面丟任務的入口關掉 計數器-1 print() ‘‘‘ 打印結果: name:ThreadPoolExecutor-0_0 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_0 pid:14052 run 主 ‘‘‘
進程池|線程池

同步調用和異步調用

提交任務的兩種方式:

  • 同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行
  • 異步調用:提交完任務後,不在原地等待任務執行完。回調機制:自動觸發
技術分享圖片
#1.同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print(%s is laing %name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*#
    return {name:name,res:res}

def weigh(shit):
    name = shit[name]
    size = len(shit[res])
    print(%s 拉了 <%s>kg %(name,size))

if __name__ == __main__:
    pool = ThreadPoolExecutor(10)

    shit1 = pool.submit(la,alex).result()
    weigh(shit1)

    shit2 = pool.submit(la,yang).result()
    weigh(shit2)

    shit3 = pool.submit(la,hang).result()
    weigh(shit3)
‘‘‘
打印結果:
alex is laing
alex 拉了 <8>kg
yang is laing
yang 拉了 <8>kg
hang is laing
hang 拉了 <7>kg
‘‘‘
同步調用 技術分享圖片
#2.異步調用:提交完任務後,不在原地等待任務執行完
from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print(%s is laing %name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*#
    return {name:name,res:res}
    # weigh({‘name‘:name,‘res‘:res})  # 這樣寫,所有功能 不能體現出解耦合

def weigh(shit):
    shit = shit.result() # 拿到是一個對象,需要進行result()
    name = shit[name]
    size = len(shit[res])
    print(%s 拉了 <%s>kg %(name,size))

if __name__ == __main__:
    pool = ThreadPoolExecutor(10)

    shit1 = pool.submit(la,alex).add_done_callback(weigh)

    shit2 = pool.submit(la,yang).add_done_callback(weigh)

    shit3 = pool.submit(la,hang).add_done_callback(weigh)
‘‘‘
打印結果:
alex is laing
yang is laing
hang is laing
hang 拉了 <10>kg
alex 拉了 <7>kg
yang 拉了 <12>kg
‘‘‘
異步調用

異步調用的應用

技術分享圖片
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):
    print(GET %s%url)
    response = requests.get(url)
    time.sleep(3)
    return {url:url,content:response.text}

def parse(res):
    res = res.result()
    print(%s parse res is %s %(res[url],len(res[content])))

if __name__ == __main__:
    urls = [
        http://www.cnblogs.com/linhaifeng,
        https://www.python.org,
        https://www.openstack.org,
    ]

    pool = ThreadPoolExecutor(2)
    for url in urls:
        pool.submit(get,url).add_done_callback(parse)
‘‘‘
打印結果:
GET http://www.cnblogs.com/linhaifeng
GET https://www.python.org
http://www.cnblogs.com/linhaifeng parse res is 16320
GET https://www.openstack.org
https://www.python.org parse res is 49273
https://www.openstack.org parse res is 64040
‘‘‘
應用

並發編程---線程queue---進程池線程池---異部調用(回調機制)