1. 程式人生 > >Python--線程隊列(queue)、multiprocessing模塊(進程對列Queue、管道(pipe)、進程池)、協程

Python--線程隊列(queue)、multiprocessing模塊(進程對列Queue、管道(pipe)、進程池)、協程

有一種 啟動進程 fin 機制 內部 優先級隊列 queue類 解決 producing

隊列(queue)

隊列只在多線程裏有意義,是一種線程安全的數據結構。

get與put方法

‘‘‘

創建一個“隊列”對象

import queue
q = queue.Queue(maxsize = 10)
queue.Queue類即是一個隊列的同步實現。
隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。 將一個值放入隊列中: q.put()
調用隊列對象的put()方法在隊尾插入一個項目。
put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為True。如果隊列當前為空且block為True
,put()方法就使調用線程暫停,直到空出一個數據單元
如果block為False,put方法將引發Full異常。


import queue

q=queue.Queue(3)

q.put(11)
q.put(22)
q.put(33)
q.put(44,False)  #queue.Full      ==q.put_nowait()
 
將一個值從隊列中取出
q.get()

調用隊列對象的get()方法從隊頭刪除並返回一個項目。
可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
import queue

q=queue.Queue(3)

q.put(11)
q.put(22)
q.put(33)

q.get()
q.get()
q.get()
q.get(False) # queue.Empty        == q.get_nowait() 
‘‘‘

join與task_done方法

join() 阻塞進程,直到所有任務完成,需要配合另一個方法task_done。

task_done() 表示某個任務完成。每一條get語句後需要一條task_done。



import queue,threading

q=queue.Queue()

def foo():
    q.put(11)
    q.put(22)
    q.put(33)
    q.join()
    print(‘ok‘)

def bar():
    print(q.get())
    q.task_done()
    print(q.get())
    q.task_done()
    print(q.get())
    q.task_done()

t1=threading.Thread(target=foo)
t1.start()

t2=threading.Thread(target=bar)
t2.start()
運行效果:
11
22
33
ok

‘‘‘

此包中的常用方法(q = Queue.Queue()):

q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)非阻塞
q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味著等到隊列為空,再執行別的操作

‘‘‘

其他模式

‘‘‘

Python Queue模塊有三種隊列及構造函數: 

1、Python Queue模塊的FIFO隊列先進先出。  class queue.Queue(maxsize) 
2、LIFO類似於堆,即先進後出。           class queue.LifoQueue(maxsize) 
3、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 


import queue

#先進後出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#優先級
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=q.get()
  print(data)

‘‘‘

生產者消費者模型

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

生產者:生產數據的模型

消費者:獲取數據的模型

生產者消費者模型優點:

  1.解耦合

  2.實現並發

import threading,queue,random,time

def producer():
    count=1
    while count<10:
        baozi=random.randint(1,100)
        q.put(baozi)
        print(‘baozi %s 做好了‘%baozi)
        time.sleep(1)
        count+=1

def consumer(id):
    while 1:
        baozi=q.get()
        time.sleep(2)
        print(‘顧客%s吃了包子%s‘%(id,baozi))

if __name__ ==‘__main__‘:
    q=queue.Queue()
    t1=threading.Thread(target=producer)
    t1.start()

    for i in range(3):
        t=threading.Thread(target=consumer,args=(i,))
        t.start()
運行效果:(部分)
baozi 4 做好了
baozi 79 做好了
顧客0吃了包子4
baozi 58 做好了
顧客1吃了包子79
baozi 28 做好了

multiprocessing模塊

由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。

multiprocessing包是Python中的多進程管理包。

它可以利用multiprocessing.Process對象來創建一個進程。也有start(), run(), join()的方法。

多進程優缺點:

  優點:可以利用多核,實現並行運算

  缺點:

    1.開銷太大

    2.通信困難

需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

import multiprocessing 
import time

def foo():
    print(‘ok‘)
    time.sleep(2)

if __name__ ==‘__main__‘:
    p=multiprocessing.Process(target=foo)
    p.start()
    print(‘ending‘)
運行結果:
ending
ok

python的進程調用


# Process類調用

from multiprocessing import Process import os import time def info(name): print(‘name:‘,name) print(‘parent process:‘,os.getppid()) print(‘process id:‘,os.getpid()) print(‘-------------‘) time.sleep(1) def foo(name): info(name) if __name__ ==‘__main__‘: info(‘main process line‘) p1=Process(target= info,args=(‘alex‘,)) p2=Process(target=foo,args=(‘egon‘,)) p1.start() p2.start() p1.join() p2.join() print(‘ending‘) 運行效果: name: main process line parent process: 3012 process id: 6836 ------------- name: alex parent process: 6836 process id: 8028 ------------- name: egon parent process: 6836 process id: 1540 ------------- ending

# 繼承Process類調用



from multiprocessing import Process
import time

class Myprocess(Process):
def __init__(self):
super(Myprocess,self).__init__()

def run(self):
print(‘hello‘,self.name,time.ctime())
time.sleep(1)

if __name__ == ‘__main__‘:
l=[]
for i in range(3):
p=Myprocess()
p.start()
l.append(p)

for p in l:
p.join()

print(‘ending‘)

運行效果:
hello Myprocess-1 Fri Jul 21 16:58:36 2017
hello Myprocess-2 Fri Jul 21 16:58:36 2017
hello Myprocess-3 Fri Jul 21 16:58:36 2017
ending

process類

註意:在windows中Process()必須放到# if __name__ == ‘__main__‘:下

原因:

由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。
如果在導入時調用Process(),那麽這將啟動無限繼承的新進程(或直到機器耗盡資源)。
這是隱藏對Process()內部調用的原因,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。(args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號, kwargs表示調用對象的字典,kwargs={‘name‘:‘egon‘,‘age‘:18})

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():啟動進程,並調用該子進程中的p.run() 

  run():strat()調用run方法,如果實例進程時未制定傳入target,這start默認執行run()方法。

  terminate():不管任務是否完成,立即停止工作進程

屬性:

  daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置

  name:進程名字。

  pid:進程號。

通過tasklist(Win)或者ps -elf |grep(linux)命令檢測每一個進程號(PID)對應的進程名

進程間通訊 

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

進程對列Queue(隊列就是管道加鎖實現的

from multiprocessing import Queue,Process

def foo(q):
    q.put([11,‘hello‘,True])

if __name__ ==‘__main__‘:
    q=Queue()

    p=Process(target=foo,args=(q,))
    p.start()

    print(q.get())
運行效果:
[11, ‘hello‘, True]

管道(pipe)

Pipe()返回的兩個連接對象代表管道的兩端。

每個連接對象都有send()和recv()方法(等等)。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那麽recv方法會拋出EOFError。

註意:send()和recv()方法使用pickle模塊對對象進行序列化。

conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法

請註意,如果兩個進程(或線程)嘗試同時讀取或寫入管道的同一端,管道中的數據可能會損壞。

Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道

dumplex:默認管道是全雙工的,如果將duplex設置成False,conn1只能用於接收,conn2只能用於發送。

from multiprocessing import Pipe,Process

def foo(sk):
    sk.send(‘hello world‘)
    print(sk.recv())

if __name__ == ‘__main__‘:
    sock,conn=Pipe()
    p=Process(target=foo,args=(sock,))
    p.start()

    print(conn.recv())
    conn.send(‘hi son‘)
運行效果:
hello world
hi son

manager對象實現數據共享

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據

進程間通信應該盡量避免使用本節所講的共享數據的方式

from multiprocessing import Manager,Process

def foo(l,n):
    l.append(n**2)

if __name__ == ‘__main__‘:
    manager=Manager()
    mlist=manager.list([11,22,33])

    l=[]
    for i in range(5):
        p=Process(target=foo,args=(mlist,i))
        p.start()
        l.append(p)

    for i in l:
        i.join()

    print(mlist)
運行效果:
[11, 22, 33, 1, 0, 9, 4, 16]

from multiprocessing import Manager,Process

def foo(dic,new_dic):
    dic.update(new_dic)

if __name__ == ‘__main__‘:
    manager=Manager()
    mdict=manager.dict({‘a‘:1,‘b‘:2})

    l=[]
    for i in range(3):
        p=Process(target=foo,args=(mdict,dict.fromkeys([‘c‘,‘d‘,‘e‘],i)))
        p.start()
        l.append(p)

    for p in l:
        p.join()

    print(mdict)
運行效果:
{‘a‘: 1, ‘b‘: 2, ‘c‘: 2, ‘d‘: 2, ‘e‘: 2}

進程池

開多進程的目的是為了並發,如果有多核,通常有幾個核就開幾個進程,進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到並行

但很明顯需要並發執行的任務要遠大於核數,這時我們就可以通過維護一個進程池來控制進程數目

當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。

Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

from multiprocessing import Pool
import time

def foo(n,):
    print(n)
    time.sleep(1)

if __name__ == ‘__main__‘:
    pool=Pool(5)
    for i in range(100):
        pool.apply_async(func=foo,args=(i,))

    pool.close()
    pool.join()

    print(‘ending‘)
運行效果:
可以理解該程序為:5人共搬100塊磚,動作同步,5人每次共搬5塊,休息1秒,繼續下一次...共搬20次

協程(重點)

英文名Coroutine。

協程,又稱微線程,是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的

yield與協程

首先回顧yield用法:

def foo():
    print(‘foo‘)
    yield
    print(‘foo2‘)
    
    return

def bar():
    print(bar)
    yield 
    print(‘bar2‘)
    
foo() #創建了一個生成器對象,不會執行代碼
bar() #創建了一個生成器對象,不會執行代碼

def foo():
    print(‘foo‘)
    yield 5
    print(‘foo2‘)
    
    yield 8

gen=foo()

ret=next(gen)
print(ret)
res=next(gen)
print(res)
運行效果:
foo
5
foo2
8

def foo():
    print(‘foo‘)
    n=yield 5
    print(‘n‘,n) #123
    print(‘foo2‘)

    yield 8

gen=foo()

ret=next(gen)
print(ret) #5
res=gen.send(‘123‘)
print(res) #8
運行效果:
foo
5
n 123
foo2
8

基於yield生成器函數實現生產者消費者模型

"""
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。
如果改用協程,生產者生產消息後,直接通過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。
"""
import time

def consumer():
    r = ‘‘
    while True:
        n = yield r  # 3、consumer通過yield拿到消息,處理,又通過yield把結果傳回;
        if not n:
            return
        print(‘[CONSUMER] ←← Consuming %s...‘ % n)
        time.sleep(1)
        r = ‘200 OK‘
def produce(c):
    next(c)  #1、首先調用c.next()啟動生成器
    n = 0
    while n < 5:
        n = n + 1
        print(‘[PRODUCER] →→ Producing %s...‘ % n)
        cr = c.send(n)  #2、然後,一旦生產了東西,通過c.send(n)切換到consumer執行;
                        #4、produce拿到consumer處理的結果,繼續生產下一條消息;
        print(‘[PRODUCER] Consumer return: %s‘ % cr)
    c.close()  #5、produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
if __name__==‘__main__‘:
    # 6、整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。
    c = consumer()
    produce(c)
運行效果:
[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK

greenlet

from greenlet import greenlet

def foo():
    print(‘ok1‘)
    gr2.switch()
    print(‘ok3‘)
    gr2.switch()

def bar():
    print(‘ok2‘)
    gr1.switch()
    print(‘ok4‘)

gr1=greenlet(foo)
gr2=greenlet(bar)
gr1.switch()
運行效果:
ok1
ok2
ok3
ok4

gevent模塊實現協程(yield、greenlet都無法實現遇到IO操作自動切換到其它協程,就用到了gevent模塊(select機制)

由於IO操作非常耗時,經常使程序處於等待狀態,gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成

import gevent

def foo():
    print(‘running in foo‘)
    gevent.sleep(2) #模擬IO操作
    print(‘switch to foo again‘)

def bar():
    print(‘switch to bar‘)
    gevent.sleep(1)
    print(‘switch to bar again‘)

gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
運行效果:
running in foo
switch to bar
switch to bar again
switch to foo again

g=gevent.spawn()創建一個協程對象g

spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的

對比普通函數執行和使用協程後的效果:

from gevent import monkey
monkey.patch_all()
# import gevent
import requests
import time

def f(url):
    response = requests.get(url)
    response_str=response.text
    print(‘get data %s---url[%s]‘ % (len(response_str), url))

start=time.time()

f(‘https://itk.org/‘)
f(‘https://www.github.com/‘)

print(time.time()-start)
運行效果:
get data 12323---url[https://itk.org/]
get data 56751---url[https://www.github.com/]
5.5523176193237305

from gevent import monkey
monkey.patch_all()
import gevent
import requests
import time

def f(url):
    response = requests.get(url)
    response_str=response.text
    print(‘get data %s---url[%s]‘ % (len(response_str), url))

start=time.time()

gevent.joinall([
        gevent.spawn(f, ‘https://itk.org/‘),
        gevent.spawn(f, ‘https://www.github.com/‘),
])

print(time.time()-start)
運行效果:
get data 12323---url[https://itk.org/]
get data 56751---url[https://www.github.com/]
3.939225435256958

需要強調的是:

  1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其他線程運行)

  2. 單線程內開啟協程,一旦遇到io,從應用程序級別(而非操作系統)控制切換

對比操作系統控制線程的切換,用戶在單線程內控制協程的切換,優點如下:

  1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級

  2. 單線程內就可以實現並發的效果,最大限度地利用cpu

缺點:

  1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
  2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

作業:

  

第一個作業:使用進程池爬取網頁內容,自己往進程池累加進程,找到可以實現效果的最短時間

import re import requests
from multiprocessing import Pool import time def run(n): url=http://www.budejie.com/pic/ + str(n) response = requests.get(url).text ret = re.compile(<div class="j-r-list-c-desc">.*?<a href=.*?>(?P<article>.*?)</a>.*?</div>, re.S) obj = ret.findall(response) print(obj) if __name__ ==__main__: s=time.time() pool=Pool(4) for i in range(1,11): pool.apply_async(func=run,args=(i,)) pool.close() pool.join() print(cost time:%s%(time.time()-s))






第二個作業:基於協程的爬蟲示例:建議使用requests模塊(urllib模塊不太方便)(基於gevent庫)
import re
import requests
import gevent
import time

def run(url):

    response = requests.get(url).text
    ret = re.compile(‘<div class="j-r-list-c-desc">.*?<a href=.*?>(?P<article>.*?)</a>.*?</div>‘, re.S)
    obj = ret.findall(response)
    print(obj)

if __name__ ==‘__main__‘:
    s=time.time()
    gevent.joinall([gevent.spawn(run,‘http://www.budejie.com/pic/‘+str(i)) for i in range(1,11)])

    print(‘cost time:%s‘%(time.time()-s))


Python--線程隊列(queue)、multiprocessing模塊(進程對列Queue、管道(pipe)、進程池)、協程