1. 程式人生 > >Python爬蟲進階六之多進程的用法

Python爬蟲進階六之多進程的用法

maxsize clas 生產 依然 queue consumer mac 裏的 filesize

前言

在上一節中介紹了thread多線程庫。python中的多線程其實並不是真正的多線程,並不能做到充分利用多核CPU資源。

如果想要充分利用,在python中大部分情況需要使用多進程,那麽這個包就叫做 multiprocessing。

借助它,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

那麽本節要介紹的內容有:

Process
Lock
Semaphore
Queue
Pipe
Pool

Process

基本使用

在multiprocessing中,每一個進程都用一個Process類來表示。首先看下它的API

Process([group [, target [, name [, args [, kwargs]]]]])

target表示調用對象,你可以傳入方法的名字
args表示被調用對象的位置參數元組,比如target是函數a,他有兩個參數m,n,那麽args就傳入(m, n)即可
kwargs表示調用對象的字典
name是別名,相當於給這個進程取一個名字
group分組,實際上不使用
我們先用一個實例來感受一下:

import multiprocessing

def process(num):
    print Process:, num

if __name__ == __main__
: for i in range(5): p = multiprocessing.Process(target=process, args=(i,)) p.start()

最簡單的創建Process的過程如上所示,target傳入函數名,args是函數的參數,是元組的形式,如果只有一個參數,那就是長度為1的元組。

然後調用start()方法即可啟動多個進程了。

另外你還可以通過 cpu_count() 方法還有 active_children() 方法獲取當前機器的 CPU 核心數量以及得到目前所有的運行的進程。

通過一個實例來感受一下:

import
multiprocessing import time def process(num): time.sleep(num) print Process:, num if __name__ == __main__: for i in range(5): p = multiprocessing.Process(target=process, args=(i,)) p.start() print(CPU number: + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print(Child process name: + p.name + id: + str(p.pid)) print(Process Ended)

運行結果:

Process: 0
CPU number:8
Child process name: Process-2 id: 9641
Child process name: Process-4 id: 9643
Child process name: Process-5 id: 9644
Child process name: Process-3 id: 9642
Process Ended
Process: 1
Process: 2
Process: 3
Process: 4

自定義類

另外你還可以繼承Process類,自定義進程類,實現run方法即可。

用一個實例來感受一下:

from multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop

    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(Pid:  + str(self.pid) +  LoopCount:  + str(count))


if __name__ == __main__:
    for i in range(2, 5):
        p = MyProcess(i)
        p.start()

在上面的例子中,我們繼承了 Process 這個類,然後實現了run方法。打印出來了進程號和參數。

運行結果:

Pid: 28116 LoopCount: 0
Pid: 28117 LoopCount: 0
Pid: 28118 LoopCount: 0
Pid: 28116 LoopCount: 1
Pid: 28117 LoopCount: 1
Pid: 28118 LoopCount: 1
Pid: 28117 LoopCount: 2
Pid: 28118 LoopCount: 2
Pid: 28118 LoopCount: 3

可以看到,三個進程分別打印出了2、3、4條結果。

我們可以把一些方法獨立的寫在每個類裏封裝好,等用的時候直接初始化一個類運行即可。

deamon

在這裏介紹一個屬性,叫做deamon。每個線程都可以單獨設置它的屬性,如果設置為True,當父進程結束後,子進程會自動被終止。

用一個實例來感受一下,還是原來的例子,增加了deamon屬性:

from multiprocessing import Process
import time
 
 
class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop
 
    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(Pid:  + str(self.pid) +  LoopCount:  + str(count))
 
 
if __name__ == __main__:
    for i in range(2, 5):
        p = MyProcess(i)
        p.daemon = True
        p.start()
 
    print Main process Ended!

在這裏,調用的時候增加了設置deamon,最後的主進程(即父進程)打印輸出了一句話。

運行結果:

Main process Ended!

結果很簡單,因為主進程沒有做任何事情,直接輸出一句話結束,所以在這時也直接終止了子進程的運行。

這樣可以有效防止無控制地生成子進程。如果這樣寫了,你在關閉這個主程序運行時,就無需額外擔心子進程有沒有被關閉了。

不過這樣並不是我們想要達到的效果呀,能不能讓所有子進程都執行完了然後再結束呢?那當然是可以的,只需要加入join()方法即可。

from multiprocessing import Process
import time
 
 
class MyProcess(Process):
    def __init__(self, loop):
        Process.__init__(self)
        self.loop = loop
 
    def run(self):
        for count in range(self.loop):
            time.sleep(1)
            print(Pid:  + str(self.pid) +  LoopCount:  + str(count))
 
 
if __name__ == __main__:
    for i in range(2, 5):
        p = MyProcess(i)
        p.daemon = True
        p.start()
        p.join()
 
 
    print Main process Ended!

在這裏,每個子進程都調用了join()方法,這樣父進程(主進程)就會等待子進程執行完畢。

運行結果:

Pid: 29902 LoopCount: 0
Pid: 29902 LoopCount: 1
Pid: 29905 LoopCount: 0
Pid: 29905 LoopCount: 1
Pid: 29905 LoopCount: 2
Pid: 29912 LoopCount: 0
Pid: 29912 LoopCount: 1
Pid: 29912 LoopCount: 2
Pid: 29912 LoopCount: 3
Main process Ended!

發現所有子進程都執行完畢之後,父進程最後打印出了結束的結果。

Lock

在上面的一些小實例中,你可能會遇到如下的運行結果:

技術分享圖片

什麽問題?有的輸出錯位了。這是由於並行導致的,兩個進程同時進行了輸出,結果第一個進程的換行沒有來得及輸出,第二個進程就輸出了結果。所以導致這種排版的問題。

那這歸根結底是因為線程同時資源(輸出操作)而導致的。

那怎麽來避免這種問題?那自然是在某一時間,只能一個進程輸出,其他進程等待。等剛才那個進程輸出完畢之後,另一個進程再進行輸出。這種現象就叫做“互斥”。

我們可以通過 Lock 來實現,在一個進程輸出時,加鎖,其他進程等待。等此進程執行結束後,釋放鎖,其他進程可以進行輸出。

我們現用一個實例來感受一下:

from multiprocessing import Process, Lock
import time
 
 
class MyProcess(Process):
    def __init__(self, loop, lock):
        Process.__init__(self)
        self.loop = loop
        self.lock = lock
 
    def run(self):
        for count in range(self.loop):
            time.sleep(0.1)
            #self.lock.acquire()
            print(Pid:  + str(self.pid) +  LoopCount:  + str(count))
            #self.lock.release()
 
if __name__ == __main__:
    lock = Lock()
    for i in range(10, 15):
        p = MyProcess(i, lock)
        p.start()

首先看一下不加鎖的輸出結果:

Pid: 45755 LoopCount: 0
Pid: 45756 LoopCount: 0
Pid: 45757 LoopCount: 0
Pid: 45758 LoopCount: 0
Pid: 45759 LoopCount: 0
Pid: 45755 LoopCount: 1
Pid: 45756 LoopCount: 1
Pid: 45757 LoopCount: 1
Pid: 45758 LoopCount: 1
Pid: 45759 LoopCount: 1
Pid: 45755 LoopCount: 2Pid: 45756 LoopCount: 2
 
Pid: 45757 LoopCount: 2
Pid: 45758 LoopCount: 2
Pid: 45759 LoopCount: 2
Pid: 45756 LoopCount: 3
Pid: 45755 LoopCount: 3
Pid: 45757 LoopCount: 3
Pid: 45758 LoopCount: 3
Pid: 45759 LoopCount: 3
Pid: 45755 LoopCount: 4
Pid: 45756 LoopCount: 4
Pid: 45757 LoopCount: 4
Pid: 45759 LoopCount: 4
Pid: 45758 LoopCount: 4
Pid: 45756 LoopCount: 5
Pid: 45755 LoopCount: 5
Pid: 45757 LoopCount: 5
Pid: 45759 LoopCount: 5
Pid: 45758 LoopCount: 5
Pid: 45756 LoopCount: 6Pid: 45755 LoopCount: 6
 
Pid: 45757 LoopCount: 6
Pid: 45759 LoopCount: 6
Pid: 45758 LoopCount: 6
Pid: 45755 LoopCount: 7Pid: 45756 LoopCount: 7
 
Pid: 45757 LoopCount: 7
Pid: 45758 LoopCount: 7
Pid: 45759 LoopCount: 7
Pid: 45756 LoopCount: 8Pid: 45755 LoopCount: 8
 
Pid: 45757 LoopCount: 8
Pid: 45758 LoopCount: 8Pid: 45759 LoopCount: 8
 
Pid: 45755 LoopCount: 9
Pid: 45756 LoopCount: 9
Pid: 45757 LoopCount: 9
Pid: 45758 LoopCount: 9
Pid: 45759 LoopCount: 9
Pid: 45756 LoopCount: 10
Pid: 45757 LoopCount: 10
Pid: 45758 LoopCount: 10
Pid: 45759 LoopCount: 10
Pid: 45757 LoopCount: 11
Pid: 45758 LoopCount: 11
Pid: 45759 LoopCount: 11
Pid: 45758 LoopCount: 12
Pid: 45759 LoopCount: 12
Pid: 45759 LoopCount: 13

可以看到有些輸出已經造成了影響。

然後我們對其加鎖:

from multiprocessing import Process, Lock
import time
 
 
class MyProcess(Process):
    def __init__(self, loop, lock):
        Process.__init__(self)
        self.loop = loop
        self.lock = lock
 
    def run(self):
        for count in range(self.loop):
            time.sleep(0.1)
            self.lock.acquire()
            print(Pid:  + str(self.pid) +  LoopCount:  + str(count))
            self.lock.release()
 
if __name__ == __main__:
    lock = Lock()
    for i in range(10, 15):
        p = MyProcess(i, lock)
        p.start()

我們在print方法的前後分別添加了獲得鎖和釋放鎖的操作。這樣就能保證在同一時間只有一個print操作。

看一下運行結果:

Pid: 45889 LoopCount: 0
Pid: 45890 LoopCount: 0
Pid: 45891 LoopCount: 0
Pid: 45892 LoopCount: 0
Pid: 45893 LoopCount: 0
Pid: 45889 LoopCount: 1
Pid: 45890 LoopCount: 1
Pid: 45891 LoopCount: 1
Pid: 45892 LoopCount: 1
Pid: 45893 LoopCount: 1
Pid: 45889 LoopCount: 2
Pid: 45890 LoopCount: 2
Pid: 45891 LoopCount: 2
Pid: 45892 LoopCount: 2
Pid: 45893 LoopCount: 2
Pid: 45889 LoopCount: 3
Pid: 45890 LoopCount: 3
Pid: 45891 LoopCount: 3
Pid: 45892 LoopCount: 3
Pid: 45893 LoopCount: 3
Pid: 45889 LoopCount: 4
Pid: 45890 LoopCount: 4
Pid: 45891 LoopCount: 4
Pid: 45892 LoopCount: 4
Pid: 45893 LoopCount: 4
Pid: 45889 LoopCount: 5
Pid: 45890 LoopCount: 5
Pid: 45891 LoopCount: 5
Pid: 45892 LoopCount: 5
Pid: 45893 LoopCount: 5
Pid: 45889 LoopCount: 6
Pid: 45890 LoopCount: 6
Pid: 45891 LoopCount: 6
Pid: 45893 LoopCount: 6
Pid: 45892 LoopCount: 6
Pid: 45889 LoopCount: 7
Pid: 45890 LoopCount: 7
Pid: 45891 LoopCount: 7
Pid: 45892 LoopCount: 7
Pid: 45893 LoopCount: 7
Pid: 45889 LoopCount: 8
Pid: 45890 LoopCount: 8
Pid: 45891 LoopCount: 8
Pid: 45892 LoopCount: 8
Pid: 45893 LoopCount: 8
Pid: 45889 LoopCount: 9
Pid: 45890 LoopCount: 9
Pid: 45891 LoopCount: 9
Pid: 45892 LoopCount: 9
Pid: 45893 LoopCount: 9
Pid: 45890 LoopCount: 10
Pid: 45891 LoopCount: 10
Pid: 45892 LoopCount: 10
Pid: 45893 LoopCount: 10
Pid: 45891 LoopCount: 11
Pid: 45892 LoopCount: 11
Pid: 45893 LoopCount: 11
Pid: 45893 LoopCount: 12
Pid: 45892 LoopCount: 12
Pid: 45893 LoopCount: 13

嗯,一切都沒問題了。

所以在訪問臨界資源時,使用Lock就可以避免進程同時占用資源而導致的一些問題。

Semaphore

信號量,是在進程同步過程中一個比較重要的角色。可以控制臨界資源的數量,保證各個進程之間的互斥和同步。

如果你學過操作系統,那麽一定對這方面非常了解,如果你還不了解信號量是什麽,可以參考

信號量解析

來了解一下它是做什麽的。

那麽接下來我們就用一個實例來演示一下進程之間利用Semaphore做到同步和互斥,以及控制臨界資源數量。

from multiprocessing import Process, Semaphore, Lock, Queue
import time
 
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
 
class Consumer(Process):
 
    def run(self):
        global buffer, empty, full, lock
        while True:
            full.acquire()
            lock.acquire()
            buffer.get()
            print(Consumer pop an element)
            time.sleep(1)
            lock.release()
            empty.release()
 
 
class Producer(Process):
    def run(self):
        global buffer, empty, full, lock
        while True:
            empty.acquire()
            lock.acquire()
            buffer.put(1)
            print(Producer append an element)
            time.sleep(1)
            lock.release()
            full.release()
 
 
if __name__ == __main__:
    p = Producer()
    c = Consumer()
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print Ended!

如上代碼實現了註明的生產者和消費者問題,定義了兩個進程類,一個是消費者,一個是生產者。

定義了一個共享隊列,利用了Queue數據結構,然後定義了兩個信號量,一個代表緩沖區空余數,一個表示緩沖區占用數。

生產者Producer使用empty.acquire()方法來占用一個緩沖區位置,然後緩沖區空閑區大小減小1,接下來進行加鎖,對緩沖區進行操作。然後釋放鎖,然後讓代表占用的緩沖區位置數量+1,消費者則相反。

運行結果如下:

Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element

可以發現兩個進程在交替運行,生產者先放入緩沖區物品,然後消費者取出,不停地進行循環。

通過上面的例子來體會一下信號量的用法。

Queue

在上面的例子中我們使用了Queue,可以作為進程通信的共享隊列使用。

在上面的程序中,如果你把Queue換成普通的list,是完全起不到效果的。即使在一個進程中改變了這個list,在另一個進程也不能獲取到它的狀態。

因此進程間的通信,隊列需要用Queue。當然這裏的隊列指的是 multiprocessing.Queue

依然是用上面那個例子,我們一個進程向隊列中放入數據,然後另一個進程取出數據。

from multiprocessing import Process, Semaphore, Lock, Queue
import time
from random import random
 
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
 
class Consumer(Process):
 
    def run(self):
        global buffer, empty, full, lock
        while True:
            full.acquire()
            lock.acquire()
            print Consumer get, buffer.get()
            time.sleep(1)
            lock.release()
            empty.release()
 
 
class Producer(Process):
    def run(self):
        global buffer, empty, full, lock
        while True:
            empty.acquire()
            lock.acquire()
            num = random()
            print Producer put , num
            buffer.put(num)
            time.sleep(1)
            lock.release()
            full.release()
 
 
if __name__ == __main__:
    p = Producer()
    c = Consumer()
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print Ended!

運行結果:

Producer put  0.719213647437
Producer put  0.44287326683
Consumer get 0.719213647437
Consumer get 0.44287326683
Producer put  0.722859424381
Producer put  0.525321338921
Consumer get 0.722859424381
Consumer get 0.525321338921

可以看到生產者放入隊列中數據,然後消費者將數據取出來。

get方法有兩個參數,blocked和timeout,意思為阻塞和超時時間。默認blocked是true,即阻塞式。

當一個隊列為空的時候如果再用get取則會阻塞,所以這時候就需要吧blocked設置為false,即非阻塞式,實際上它就會調用get_nowait()方法,此時還需要設置一個超時時間,在這麽長的時間內還沒有取到隊列元素,那就拋出Queue.Empty異常。

當一個隊列為滿的時候如果再用put放則會阻塞,所以這時候就需要吧blocked設置為false,即非阻塞式,實際上它就會調用put_nowait()方法,此時還需要設置一個超時時間,在這麽長的時間內還沒有放進去元素,那就拋出Queue.Full異常。

另外隊列中常用的方法

Queue.qsize() 返回隊列的大小 ,不過在 Mac OS 上沒法運行。

原因:

def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
return self._maxsize – self._sem._semlock._get_value()

Queue.empty() 如果隊列為空,返回True, 反之False

Queue.full() 如果隊列滿了,返回True,反之False

Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間

Queue.get_nowait() 相當Queue.get(False)

Queue.put(item) 阻塞式寫入隊列,timeout等待時間

Queue.put_nowait(item) 相當Queue.put(item, False)

Pipe

管道,顧名思義,一端發一端收。

Pipe可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創建單向管道 (默認為雙向)。一個進程從PIPE一端輸入對象,然後被PIPE另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。

用一個實例來感受一下:

from multiprocessing import Process, Pipe
import time
 
 
class Consumer(Process):
    def __init__(self, pipe):
        Process.__init__(self)
        self.pipe = pipe
 
    def run(self):
        self.pipe.send(Consumer Words)
        time.sleep(1)
        print Consumer Received:, self.pipe.recv()
 
 
class Producer(Process):
    def __init__(self, pipe):
        Process.__init__(self)
        self.pipe = pipe
 
    def run(self):
        print Producer Received:, self.pipe.recv()
        self.pipe.send(Producer Words)
 
 
if __name__ == __main__:
    pipe = Pipe()
    p = Producer(pipe[0])
    c = Consumer(pipe[1])
    p.daemon = c.daemon = True
    p.start()
    c.start()
    p.join()
    c.join()
    print Ended!

在這裏聲明了一個默認為雙向的管道,然後將管道的兩端分別傳給兩個進程。兩個進程互相收發。觀察一下結果:

Producer Received: Consumer Words
Consumer Received: Producer Words
Ended!

以上是對pipe的簡單介紹。

Pool

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

在這裏需要了解阻塞和非阻塞的概念。

阻塞和非阻塞關註的是程序在等待調用結果(消息,返回值)時的狀態。

阻塞即要等到回調結果出來,在有結果之前,當前進程會被掛起。

Pool的用法有阻塞和非阻塞兩種方式。非阻塞即為添加進程後,不一定非要等到改進程執行完就添加其他進程運行,阻塞則相反。

現用一個實例感受一下非阻塞的用法:

from multiprocessing import Lock, Pool
import time
 
 
def function(index):
    print Start process: , index
    time.sleep(3)
    print End process, index
 
 
if __name__ == __main__:
    pool = Pool(processes=3)
    for i in xrange(4):
        pool.apply_async(function, (i,))
 
    print "Started processes"
    pool.close()
    pool.join()
    print "Subprocess done."

在這裏利用了apply_async方法,即非阻塞。

運行結果:

Started processes
Start process: Start process:  0
1
Start process:  2
End processEnd process 0
1
Start process:  3
End process 2
End process 3
Subprocess done.

可以發現在這裏添加三個進程進去後,立馬就開始執行,不用非要等到某個進程結束後再添加新的進程進去。

下面再看看阻塞的用法:

from multiprocessing import Lock, Pool
import time
 
 
def function(index):
    print Start process: , index
    time.sleep(3)
    print End process, index
 
 
if __name__ == __main__:
    pool = Pool(processes=3)
    for i in xrange(4):
        pool.apply(function, (i,))
 
    print "Started processes"
    pool.close()
    pool.join()
    print "Subprocess done."

在這裏只需要把apply_async改成apply即可。

運行結果如下:

Start process:  0
End process 0
Start process:  1
End process 1
Start process:  2
End process 2
Start process:  3
End process 3
Started processes
Subprocess done.

這樣一來就好理解了吧?

下面對函數進行解釋:

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的。

close() 關閉pool,使其不在接受新的任務。

terminate() 結束工作進程,不在處理未完成的任務。

join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之後使用。

當然每個進程可以在各自的方法返回一個結果。apply或apply_async方法可以拿到這個結果並進一步進行處理。

from multiprocessing import Lock, Pool
import time
 
 
def function(index):
    print Start process: , index
    time.sleep(3)
    print End process, index
    return index
 
if __name__ == __main__:
    pool = Pool(processes=3)
    for i in xrange(4):
        result = pool.apply_async(function, (i,))
        print result.get()
    print "Started processes"
    pool.close()
    pool.join()
    print "Subprocess done."

運行結果:

Start process:  0
End process 0
0
Start process:  1
End process 1
1
Start process:  2
End process 2
2
Start process:  3
End process 3
3
Started processes
Subprocess done.

另外還有一個非常好用的map方法。

如果你現在有一堆數據要處理,每一項都需要經過一個方法來處理,那麽map非常適合。

比如現在你有一個數組,包含了所有的URL,而現在已經有了一個方法用來抓取每個URL內容並解析,那麽可以直接在map的第一個參數傳入方法名,第二個參數傳入URL數組。

現在我們用一個實例來感受一下:

from multiprocessing import Pool
import requests
from requests.exceptions import ConnectionError
 
 
def scrape(url):
    try:
        print requests.get(url)
    except ConnectionError:
        print Error Occured , url
    finally:
        print URL , url,  Scraped
 
 
if __name__ == __main__:
    pool = Pool(processes=3)
    urls = [
        https://www.baidu.com,
        http://www.meituan.com/,
        http://blog.csdn.net/,
        http://xxxyxxx.net
    ]
    pool.map(scrape, urls)

在這裏初始化一個Pool,指定進程數為3,如果不指定,那麽會自動根據CPU內核來分配進程數。

然後有一個鏈接列表,map函數可以遍歷每個URL,然後對其分別執行scrape方法。

運行結果:

<Response [403]>
URL  http://blog.csdn.net/  Scraped
<Response [200]>
URL  https://www.baidu.com  Scraped
Error Occured  http://xxxyxxx.net
URL  http://xxxyxxx.net  Scraped
<Response [200]>
URL  http://www.meituan.com/  Scraped

可以看到遍歷就這麽輕松地實現了。

結語

多進程multiprocessing相比多線程功能強大太多,而且使用範圍更廣,希望本文對大家有幫助!

Python爬蟲進階六之多進程的用法