1. 程式人生 > >python多進程(三)

python多進程(三)

Go 分別是 開始 取數 monit box get() duplex red

消息隊列

消息隊列”是在消息的傳輸過程中保存消息的容器。 消息隊列最經典的用法就是消費者和生成者之間通過消息管道來傳遞消息,消費者和生成者是不通的進程。生產者往管道中寫消息,消費者從管道中讀消息。 技術分享圖片 相當於水管,有一個入口和出口,水從入口流入,從出口流出,這就是一個消息隊列。左側線程或者進程往隊列裏面添加數據,它的任務就結束了,右側線程或者進程只要依次從出口處讀取數據就可以了。 消息隊列的思想 比如在京東下單,並付完錢,相當於把消息堆在了水管裏面,會返回一個結果給客戶,告知客戶已經購買此商品,後臺會有線程去接收並處理這個訂單消息,然後去庫房發貨、走物流,知道最後接收貨物並簽收,這個流程就算結束了。所以,在異步處理問題的時候,都會用到消息隊列的這種思想。操作系統提供了很多機制來實現進程間的通信 ,multiprocessing模塊就提供了Queue和Pipe兩種方法來實現。

使用multiprocessing裏面的Queue來實現消息隊列。

語法格式:
from multiprocessing import Queue
q = Queue
q.put(data)
data = q.get(data)

例子:

from multiprocessing import Queue, Process

# 寫數據的進程
def write(q):
    for i in [a,b,c,d]:
        q.put(i)  # 把消息放入隊列
        print (put {0} to queue.format(i))

#
讀取數據的進程 def read(q): while 1: result = q.get() # 從隊列中讀取消息 print ("get {0} from queue".format(result)) def main(): # 父進程創建Queue,並傳給各個子進程 q = Queue() pw = Process(target=write,args=(q,)) # 使用多進程,傳入的參數是消息隊列 pr = Process(target=read,args=(q,)) pw.start() # 啟動子進程,寫入數據
pr.start() # 啟動子進程,讀取數據 pw.join() # 等待pw進程結束 pr.terminate() #停止 # 相當於join,等pr完成以後,while是一個死循環,這裏強制結束,因為讀取數據的進程應該是一直監聽是否有數據產生,有就會去讀取。 if __name__ == __main__: main() 結果: put a to queue get a from queue put b to queue get b from queue put c to queue get c from queue put d to queue get d from queue

使用multiprocessing裏面的PIPE來實現消息隊列。

1、Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那麽這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接收消息,conn2只負責發送消息。 2、send和recv方法分別是發送和接受消息的方法。close方法表示關閉管道,當消息接收結束以後,關閉管道。 例子:
import time
from multiprocessing import Pipe, Process
 
# 發送消息的進程 
def proc1(pipe):
    for i in xrange(1, 10):
        pipe.send(i)
        print ("send {0} to pipe".format(i))
        time.sleep(1)
 
# 接收消息的進程
def proc2(pipe):
    n = 9
    while n > 0:
        result = pipe.recv()
        print ("recv {0} from pipe".format(result))
        n -= 1
 
 
def main():
    pipe = Pipe(duplex=False)  # 設置半雙工模式,p1只負責發送消息,p2只負責接收消息,pipe是一個tuple類型
    p1 = Process(target=proc1, args=(pipe[1],))
    p2 = Process(target=proc2, args=(pipe[0],)) #接收寫0
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    pipe[0].close()
    pipe[1].close()
 
 
if __name__ == __main__:
    main()

結果:
send 1 to pipe
recv 1 from pipe
recv 2 from pipe
send 2 to pipe
send 3 to pipe
recv 3 from pipe
recv 4 from pipe
send 4 to pipe
send 5 to pipe
recv 5 from pipe
recv 6 from pipe
send 6 to pipe
recv 7 from pipe
send 7 to pipe
recv 8 from pipe
send 8 to pipe
send 9 to pipe
recv 9 from pipe

Python提供了Queue模塊來專門實現消息隊列

Queue對象 Queue對象實現一個fifo隊列(其他的還有lifo、priority隊列)。queue只有maxsize一個構造參數,用來指定隊列容量,指定為0的時候代表容量無限。主要有以下成員函數: Queue.qsize():返回消息隊列的當前空間。返回的值不一定可靠。 Queue.empty():判斷消息隊列是否為空,返回True或False。同樣不可靠。 Queue.full():類似上邊,判斷消息隊列是否滿 Queue.put(item, block=True, timeout=None):往消息隊列中存放消息。block可以控制是否阻塞,timeout指定阻塞時候的等待時間。如果不阻塞或者超時,會引起一個full exception。 Queue.put_nowait(item):相當於put(item, False). Queue.get(block=True, timeout=None):獲取一個消息,其他同put。 以下兩個函數用來判斷消息對應的任務是否完成。 Queue.task_done():接受消息的線程通過調用這個函數來說明消息對應的任務已完成。 Queue.join(): 實際上意味著等到隊列為空,再執行別的操作 例子:
from multiprocessing import Queue
from threading import Thread
import time

"""
一個生產者和兩個消費者,
采用多線程繼承的方式,
一個消費偶數,一個消費奇數。
"""


class Proceducer(Thread):
    def __init__(self, queue):
        super(Proceducer, self).__init__()
        self.queue = queue

    def run(self):
        try:
            for i in xrange(1, 10):
                print ("put {0} to queue".format(i))
                self.queue.put(i)
        except Exception as e:
            print ("put data error")
            raise e


class Consumer_even(Thread):
    def __init__(self, queue):
        super(Consumer_even, self).__init__()
        self.queue = queue

    def run(self):
        try:
            while not self.queue.empty():   # 判斷隊列是否為空
                number = self.queue.get(block=True, timeout=3)  # 從隊列中獲取消息,block=True表示阻塞,設置超時未3s
                if number % 2 == 0:   # 如果獲取的消息是偶數
                    print("get {0} from queue EVEN, thread name is {1}".format(number, self.getName()))
                else:
                    self.queue.put(number)  # 如果獲取的消息不是偶數,就接著把它放回隊列中
                time.sleep(1)
        except Exception as e:
            raise e


class Consumer_odd(Thread):
    def __init__(self, queue):
        super(Consumer_odd, self).__init__()
        self.queue = queue

    def run(self):
        try:
            while not self.queue.empty():
                number = self.queue.get(block=True, timeout=3)
                if number % 2 != 0:  # 如果獲取的消息是奇數
                    print("get {0} from queue ODD, thread name is {1}".format(number, self.getName()))
                else:
                    self.queue.put(number)
                time.sleep(1)
        except Exception as e:
            raise e


def main():
    queue = Queue()
    p = Proceducer(queue=queue)
    # 開始產生消息
    print("開始產生消息")
    p.start()
    p.join()   # 等待生產消息的進程結束
    time.sleep(1) # 消息生產完成之後暫停1s
    c1 = Consumer_even(queue=queue)
    c2 = Consumer_odd(queue=queue)

    # 開始消費消息
    print("開始消費消息")
    c1.start()
    c2.start()
    c1.join()
    c2.join()
    print ("消息消費完成")


if __name__ == __main__:
    main()

結果:
開始產生消息
put 1 to queue
put 2 to queue
put 3 to queue
put 4 to queue
put 5 to queue
put 6 to queue
put 7 to queue
put 8 to queue
put 9 to queue
開始消費消息
get 1 from queue ODD, thread name is Thread-3
get 2 from queue EVEN, thread name is Thread-2
get 3 from queue ODD, thread name is Thread-3
get 4 from queue EVEN, thread name is Thread-2
get 5 from queue ODD, thread name is Thread-3
get 6 from queue EVEN, thread name is Thread-2
get 7 from queue ODD, thread name is Thread-3
get 8 from queue EVEN, thread name is Thread-2
get 9 from queue ODD, thread name is Thread-3
消息消費完成

Celery異步分布式

什麽是celery

Celery是一個python開發的異步分布式任務調度模塊。

幾個概念

broker: brokers 中文意思為中間人,在這裏就是指任務隊列本身,Celery 扮演生產者和消費者的角色,brokers 就是生產者和消費者存放/拿取產品的地方(隊列) ,常見的 brokers 有 rabbitmq、redis、Zookeeper 等。 backend: 顧名思義就是結果儲存的地方,隊列中的任務運行完後的結果或者狀態需要被任務發送者知道,那麽就需要一個地方儲存這些結果,就是 Result Stores 了 ,常見的 backend 有 redis、Memcached 甚至常用的數據都可以。 worker: 就是 Celery 中的工作者,類似與生產/消費模型中的消費者,其從隊列中取出任務並執行。 task: 就是我們想在隊列中進行的任務,一般由用戶、觸發器或其他操作將任務入隊,然後交由workers進行處理。 Celery本身並不提供消息服務,使用第三方服務,也就是borker來傳遞任務,目前支持rebbimq,redis, 數據庫等。 這裏我們用redis當做celery的broker和backend。 技術分享圖片

連接url的格式為
redis://:password@hostname:port/db_number
例如:
BROKER_URL = redis://localhost:6379/0

安裝celery

pip install celery
pip install redis
pip install redis-py-with-geo  # 沒有安裝這個會報錯


File "/usr/lib/python2.7/site-packages/kombu/transport/redis.py", line 671, in _receive
    while c.connection.can_read(timeout=0):
TypeError: can_read() got an unexpected keyword argument timeout

在服務器上安裝redis並啟動redis,我安裝的redis指定端口為5000。

例子:

vi tasks.py
#/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery
broker="redis://110.106.106.220:5000/5"
backend="redis://110.106.106.220:5000/6"
app = Celery("tasks", broker=broker, backend=backend)

@app.task
def add(x, y):
return x+y

現在broker、backend、task都有了,接下來我們就運行worker進行工作,在tasks.py目錄運行:

celery -A tasks worker -l info

啟動後可以看到如下信息:

[root@izwz920j4zsv1q15yhii1qz scripts]# celery -A celery_test worker -l info
/usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: Youre running the worker with superuser privileges: this is absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@izwz920j4zsv1q15yhii1qz v4.1.1 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2018-05-25 14:28:38
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_test:0x25a6450
- ** ---------- .> transport:   redis://110.106.106.220:5000/5
- ** ---------- .> results:     redis://110.106.106.220:5000/6
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . tasks.add

[2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5
[2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching for neighbors
[2018-05-25 14:28:39,475: INFO/MainProcess] mingle: all alone
[2018-05-25 14:28:39,528: INFO/MainProcess] celery@izwz920j4zsv1q15yhii1qz ready.

意思就是運行 tasks 這個任務集合的 worker 進行工作(當然此時broker中還沒有任務,worker此時相當於待命狀態),最後一步,就是觸發任務,最簡單方式就是再寫一個腳本然後調用那個被裝飾成 task 的函數。

vi trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),這裏需要用 celery 提供的接口 delay 進行調用
while not result.ready():  # 是否處理
    time.sleep(1)
print task done: {0}.format(result.get())  # 獲取結果
print(result.task_id)

delay 返回的是一個 AsyncResult 對象,裏面存的就是一個異步的結果,當任務完成時result.ready() 為 true,然後用 result.get() 取結果即可。 運行trigger.py之後可以看到如下信息:
[root@izwz920j4zsv1q15yhii1qz scripts]# python trigger.py 
task done: 8
celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2

celery的任務狀態

技術分享圖片

在之前啟動tasks.py的窗口可以看到如下信息:

[2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5
[2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching for neighbors
[2018-05-25 14:28:39,475: INFO/MainProcess] mingle: all alone
[2018-05-25 14:28:39,528: INFO/MainProcess] celery@izwz920j4zsv1q15yhii1qz ready.
[2018-05-25 14:33:30,340: INFO/MainProcess] Received task: tasks.add[d64def11-6b77-443f-84c2-0cbd850972f2]  
[2018-05-25 14:33:30,373: INFO/ForkPoolWorker-1] Task tasks.add[d64def11-6b77-443f-84c2-0cbd850972f2] succeeded in 0.0313169739966s: 8
[2018-05-25 14:33:47,082: INFO/MainProcess] Received task: tasks.add[5ae26e89-5d91-496e-8e1c-e0504fbbd39a]  
[2018-05-25 14:33:47,086: INFO/ForkPoolWorker-1] Task tasks.add[5ae26e89-5d91-496e-8e1c-e0504fbbd39a] succeeded in 0.00259069999447s: 8

在redis中查看:

110.106.106.220:5000[5]> select 5
OK
110.106.106.220:5000[5]> keys *
1) "_kombu.binding.celeryev"
2) "_kombu.binding.celery.pidbox"
3) "_kombu.binding.celery"
110.106.106.220:5000[5]> select 6
OK
110.106.106.220:5000[6]> keys *
1) "celery-task-meta-5ae26e89-5d91-496e-8e1c-e0504fbbd39a"
2) "celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2"
110.106.106.220:5000[6]> get celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"d64def11-6b77-443f-84c2-0cbd850972f2\", \"children\": []}"
110.106.106.220:5000[6]> get celery-task-meta-5ae26e89-5d91-496e-8e1c-e0504fbbd39a
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"5ae26e89-5d91-496e-8e1c-e0504fbbd39a\", \"children\": []}"

python多進程(三)