1. 程式人生 > >python自動化運維之多進程

python自動化運維之多進程

python 多進程 multiprocessing

python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
1、Process
創建進程的類:Process([group[,target[,name[,args[,kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。

方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。
屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且自己不能產生新進程,必須在start()之前設置。
1.1 創建函數並將其作為單個進程

import multiprocessing
import time,os
def worker(interval):
    n = 5
    while n > 0:
        print("[%s] The time is %s" %(os.getpid(),time.ctime()))
        time.sleep(interval)
        n -= 1
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("主進程PID:%s" %os.getpid())
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

執行結果:

主進程PID:17476
p.pid: 16476
p.name: Process-1
p.is_alive: True
[16476] The time is Thu Aug 31 16:23:04 2017
[16476] The time is Thu Aug 31 16:23:08 2017
[16476] The time is Thu Aug 31 16:23:11 2017
[16476] The time is Thu Aug 31 16:23:14 2017
[16476] The time is Thu Aug 31 16:23:17 2017

1.2 創建函數並將其作為多個進程

import multiprocessing
import time,os
def worker_1(interval):
    print("[%s] worker_1" % os.getpid())
    time.sleep(interval)
    print("[%s] end worker_1" % os.getpid())
def worker_2(interval):
    print("[%s] worker_2" % os.getpid())
    time.sleep(interval)
    print("[%s] end worker_2" % os.getpid())
def worker_3(interval):
    print("[%s] worker_3" % os.getpid())
    time.sleep(interval)
    print("[%s] end worker_3" % os.getpid())
if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))
    p1.start()
    p2.start()
    p3.start()
    print("The number of CPU is: %s" %(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child p.name:%s\tp.id:%s" %(p.name,p.pid))
    print("END!!!!!!!!!!!!!!!!!")

執行結果:

The number of CPU is: 2
child p.name:Process-2    p.id:15948
child p.name:Process-3    p.id:11792
child p.name:Process-1    p.id:2648
END!!!!!!!!!!!!!!!!!
[11792] worker_3
[2648] worker_1
[15948] worker_2
[2648] end worker_1
[15948] end worker_2
[11792] end worker_3

1.3:將進程定義為類

import multiprocessing
import time,os
class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval
    def run(self):
        n = 5
        while n > 0:
            print("[%s] the time is %s" %(os.getpid(),time.ctime()))
            time.sleep(self.interval)
            n -= 1
if __name__ == ‘__main__‘:
    p = ClockProcess(3)
    p.start()

註:進程p調用start()時,自動調用run()
執行結果:

[2128] the time is Thu Aug 31 16:38:30 2017
[2128] the time is Thu Aug 31 16:38:33 2017
[2128] the time is Thu Aug 31 16:38:36 2017
[2128] the time is Thu Aug 31 16:38:39 2017
[2128] the time is Thu Aug 31 16:38:42 2017

1.4 daemon程序對比結果
(1)不加daemon屬性

import multiprocessing
import time,os
def worker(interval):
    print("[%s] work start:%s " %(os.getpid(),time.ctime()))
    time.sleep(interval)
    print("[%s] work end:%s " % (os.getpid(), time.ctime()))
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("主進程PID:%s" %os.getpid())

執行結果:
主進程PID:7724

[3728] work start:Thu Aug 31 16:44:14 2017 
[3728] work end:Thu Aug 31 16:44:17 2017

(2)加上daemon屬性

import multiprocessing
import time,os
def worker(interval):
    print("[%s] work start:%s " %(os.getpid(),time.ctime()))
    time.sleep(interval)
    print("[%s] work end:%s " % (os.getpid(), time.ctime()))
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print("主進程PID:%s" %os.getpid())

執行結果:

主進程PID:13700

註意:因子進程設置了daemon屬性(守護進程),主進程結束,它們就隨著結束了。
(3)設置daemon執行完結束的方法

import multiprocessing
import time,os
def worker(interval):
    print("[%s] work start:%s " %(os.getpid(),time.ctime()))
    time.sleep(interval)
    print("[%s] work end:%s " % (os.getpid(), time.ctime()))
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print("主進程PID:%s" %os.getpid())

執行結果:

[9600] work start:Thu Aug 31 16:46:10 2017 
[9600] work end:Thu Aug 31 16:46:13 2017 
主進程PID:14184

註意:p.join()為主進程等待p進程結束後再往下執行,下面有詳細說明
2、Lock
當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。

import multiprocessing
import sys, os
def worker_with(lock, f):
    with lock:
        with open(f, ‘a+‘) as fs:
            n = 10
            while n > 1:
                fs.write("[%s] Lockd acquired via with\n" %os.getpid())
                n -= 1
def worker_no_with(lock, f):
    lock.acquire()
    try:
        with open(f, ‘a+‘) as fs:
            n = 10
            while n > 1:
                fs.write("[%s] Lock acquired directly\n" %os.getpid())
                n -= 1
    finally:
        lock.release()
if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target=worker_with, args=(lock, f))
    nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("主進程PID:%s" % os.getpid())

執行結果(輸出文件)

[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1872] Lockd acquired via with
[1512] Lock acquired directly
[1512] Lock acquired directly
[1512] Lock acquired directly
[1512] Lock acquired directly
[1512] Lock acquired directly
[1512] Lock acquired directly
[1512] Lock acquired directly
[1512] Lock acquired directly
[1512] Lock acquired directly

3. Semaphore
Semaphore用來控制對共享資源的訪問數量,例如池的最大連接數。

import multiprocessing
import time,os
def worker(s, i):
    s.acquire()
    print("[%s]\t%s acquire" %(os.getpid(),multiprocessing.current_process().name))
    time.sleep(i)
    print("[%s]\t%s release" %(os.getpid(),multiprocessing.current_process().name))
    s.release()
if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()
    print("主進程PID:%s" % os.getpid())

執行結果:

主進程PID:11428
[12276]   Process-2 acquire
[6352]    Process-4 acquire
[12276]   Process-2 release
[3948]    Process-3 acquire
[6352]    Process-4 release
[9400]    Process-5 acquire
[3948]    Process-3 release
[1392]    Process-1 acquire
[1392]    Process-1 release
[9400]    Process-5 release


4、Event
Event用來實現進程間同步通信。

import multiprocessing
import time,os
def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set() -> %s" %str(e.is_set()))
def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set -> %s" %str(e.is_set()))
if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))
    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()
    time.sleep(3)
    e.set()
    print("主進程PID:%s" % os.getpid())
    print("main: event is set")

執行結果:

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set -> False
wairt_for_event: e.is_set() -> True
主進程PID:9444
main: event is set

5、Queue
Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。Queue的一段示例代碼:
import multiprocessing

def writer_proc(q):
    try:
        q.put(1, block = False)
    except:
        pass
def reader_proc(q):
    try:
        print(q.get(block = False))
    except:
        pass
if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))
    writer.start()
    reader = multiprocessing.Process(target=reader_proc, args=(q,))
    reader.start()
    reader.join()
    writer.join()

執行結果:

1


6、Pipe
Pipe方法返回(conn1,conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那麽這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。
send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麽recv方法會拋出EOFError。

import multiprocessing
import time
def proc1(pipe):
    while True:
        for i in range(10):
            print("send: %s" %(i))
            pipe.send(i)
            time.sleep(1)
def proc2(pipe):
    while True:
        print("proc2 rev:", pipe.recv())
        time.sleep(1)
def proc3(pipe):
    while True:
        print("PROC3 rev:", pipe.recv())
        time.sleep(1)
if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

7、Pool
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
7.1 使用進程池(非阻塞)

import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in range(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   # 維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   # 調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束
    print("Sub-process(es) done.")

執行結果:

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

函數解釋:
apply_async(func[,args[,kwds[,callback]]])它是非阻塞,apply(func[,args[,kwds]])是阻塞的(理解區別,看例1例2結果區別)
close()關閉pool,使其不在接受新的任務。
terminate()結束工作進程,不在處理未完成的任務。
join()主進程阻塞,等待子進程的退出, join方法要在close或terminate之後使用。
執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事後才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"後。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環後直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。
7.2 使用進程池(阻塞)

import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in range(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   # 維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束
    print("Sub-process(es) done.")

執行結果:

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.


7.3 使用進程池,並關註結果

import multiprocessing
import time
def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in range(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print("Sub-process(es) done.")

執行結果:

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

7.4 使用多個進程池

import multiprocessing
import os, time, random


def Lee():
    print
    "\nRun task Lee-%s" % (os.getpid())  # os.getpid()獲取當前的進程的ID
    start = time.time()
    time.sleep(random.random() * 10)  # random.random()隨機生成0-1之間的小數
    end = time.time()
    print(‘Task Lee, runs %0.2f seconds.‘ % (end - start))


def Marlon():
    print("\nRun task Marlon-%s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end = time.time()
    print(‘Task Marlon runs %0.2f seconds.‘ % (end - start))


def Allen():
    print("\nRun task Allen-%s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print(‘Task Allen runs %0.2f seconds.‘ % (end - start))


def Frank():
    print("\nRun task Frank-%s" % (os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print(‘Task Frank runs %0.2f seconds.‘ % (end - start))


if __name__ == ‘__main__‘:
    function_list = [Lee, Marlon, Allen, Frank]
    print("parent process %s" % (os.getpid()))

    pool = multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)  # Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中
    print(‘Waiting for all subprocesses done...‘)
    pool.close()
    pool.join()  # 調用join之前,一定要先調用close() 函數,否則會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束
    print(‘All subprocesses done.‘)

執行結果:

parent process 10992
Waiting for all subprocesses done...

Run task Marlon-12828

Run task Allen-12880

Run task Frank-784
Task Lee, runs 7.22 seconds.
Task Frank runs 11.81 seconds.
Task Marlon runs 14.34 seconds.
Task Allen runs 21.21 seconds.
All subprocesses done.

本文出自 “炫維” 博客,轉載請與作者聯系!

python自動化運維之多進程