1. 程式人生 > >python多程序通訊例項分析

python多程序通訊例項分析

作業系統會為每一個建立的程序分配一個獨立的地址空間,不同程序的地址空間是完全隔離的,因此如果不加其他的措施,他們完全感覺不到彼此的存在。那麼程序之間怎麼進行通訊?他們之間的關聯是怎樣的?實現原理是什麼?本文就來藉助Python簡單的聊一下程序之間的通訊?還是那句話,原理是相同的,希望能透過具體的例子來體會一下本質的東西。

 下面儘量以簡單的方式介紹一下每一類通訊方式,具體的細節可以參照文件使用;

1. 管道

先來看一下最簡單、古老的一種IPC:管道。通常指的是無名管道,本質上可以看做一種檔案,只存在於記憶體當中,不會存檔。不同程序通過系統提供的介面來向管道中讀取或者寫入資料。

也就是說我們通過這樣一箇中間介質為程序提供交流的方式。無名管道的侷限在於一般只用於有直接關聯關係的父子程序。下面通過一個簡單的例子來看一下其用法。

from multiprocessing import Process, Pipe

def pstart(pname, conn):
    conn.send("Data@subprocess")
    print(conn.recv())          # Data@parentprocess

if __name__ == '__main__':
    conn1, conn2 = Pipe(True)
    sub_proc = Process(target=pstart, args=('subprocess', conn2,))
    sub_proc.start()
    print (conn1.recv())        # Data@subprocess
    conn1.send("Data@parentprocess")
    sub_proc.join()

管道通訊三步曲:

  1. 建立Pipe,得到兩個connection物件conn1和conn2;
  2. 父程序持有conn1,將conn2傳遞給子程序;
  3. 父子程序通過對持有的connection物件進行send和recv操作以進行資料傳遞和接受;

上面我們建立的是全雙工管道,也可以建立半雙工管道,具體使用可以參照官網描述:

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

If duplex is True (the default) then the pipe is bidirectional. If duplex

is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.

2. 具名管道(FIFO)

上面介紹的管道主要用於有直接關係的程序,侷限性比較大。下面來看一下可以在任意程序間進行通訊的具名管道。

由於window平臺上os模組沒有mkfifo屬性,因此這個例子只能在linux上執行(測試環境 CentOS 7, Python 2.7.5):

#!/usr/bin/python
import os, time
from multiprocessing import Process

input_pipe = "./pipe.in"
output_pipe = "./pipe.out"

def consumer():
    if os.path.exists(input_pipe):
        os.remove(input_pipe)
    if os.path.exists(output_pipe):
        os.remove(output_pipe)

    os.mkfifo(output_pipe)
    os.mkfifo(input_pipe)
    in1 = os.open(input_pipe, os.O_RDONLY)        # read from pipe.in
    out1 = os.open(output_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR)
    while True:
        read_data = os.read(in1, 1024)
        print("received data from pipe.in: %s @consumer" % read_data)
        if len(read_data) == 0:
            time.sleep(1)
            continue

        if "exit" in read_data:
            break
        os.write(out1, read_data)
    os.close(in1)
    os.close(out1)

def producer():
    in2 = None
    out2 = os.open(input_pipe, os.O_SYNC | os.O_CREAT | os.O_RDWR)

    for i in range(1, 4):
        msg = "msg " + str(i)
        len_send = os.write(out2, msg)
        print("------product msg: %s by producer------" % msg)
        if in2 is None:
            in2 = os.open(output_pipe, os.O_RDONLY)        # read from pipe.out
        data = os.read(in2, 1024)
        if len(data) == 0:
            break
        print("received data from pipe.out: %s @producer" % data)
        time.sleep(1)

    os.write(out2, 'exit')
    os.close(in2)
    os.close(out2)

if __name__ == '__main__':
    pconsumer = Process(target=consumer, args=())
    pproducer = Process(target=producer, args=())
    pconsumer.start()
    time.sleep(0.5)
    pproducer.start()
    pconsumer.join()
    pproducer.join()

執行流程如下:

 每一輪的過程如下:

  1. producer程序往pipe.in檔案中寫入訊息資料;
  2. consumer程序從pipe.in檔案中讀入訊息資料;
  3. consumer程序往pipe.out檔案中寫入回執訊息資料;
  4. producer程序從pipe.out檔案中讀出回執訊息資料;

結果如下:

[shijun@localhost python]$ python main.py
------product msg: msg 1 by producer------
received data from pipe.in: msg 1 @consumer
received data from pipe.out: msg 1 @producer
------product msg: msg 2 by producer------
received data from pipe.in: msg 2 @consumer
received data from pipe.out: msg 2 @producer
------product msg: msg 3 by producer------
received data from pipe.in: msg 3 @consumer
received data from pipe.out: msg 3 @producer
received data from pipe.in: exit @consumer
View Code

兩個程序沒有直接的關係,每個程序有一個讀檔案和寫檔案,如果兩個程序的讀寫檔案是關聯的,就可以進行通訊。

 3. 訊息佇列(Queue)

程序之間通過向佇列中新增資料或者從佇列中獲取資料來進行訊息資料的傳遞。下面是一個簡單的例子。

from multiprocessing import Process, Queue
import time

def producer(que):
    for product in ('Orange', 'Apple', ''):
        print('put product: %s to queue' % product)
        que.put(product)
        time.sleep(0.5)
        res = que.get()
        print('consumer result: %s' % res)

def consumer(que):
    while True:
        product = que.get()
        print('get product:%s from queue' % product)
        que.put('suc!')
        time.sleep(0.5)
        if not product:
            break

if __name__ == '__main__':
    que = Queue(1)
    p = Process(target=producer, args=(que,))
    c = Process(target=consumer, args=(que,))
    p.start()
    c.start()
    p.join()
    c.join()

這個例子比較簡單,queue的具體用法可以參考一下官網。

結果:

put product: Orange to queue
consumer result: suc!
put product: Apple to queue
consumer result: suc!
put product:  to queue
consumer result: suc!
get product:Orange from queue
get product:Apple from queue
get product: from queue
View Code

這裡有幾點需要注意下:

  1. 可以指定佇列的容量,如果超出容量會有異常:raise Full;
  2. 預設put和get均會阻塞當前程序;
  3. 如果put沒有設定成阻塞,那麼可能自己從佇列中取出自己放入的資料;

4. 共享記憶體

共享記憶體是一種常用的,高效的程序之間的通訊方式,為了保證共享記憶體的有序訪問,需要對程序採取額外的同步措施。

下面的這個例子僅僅簡單的演示了Python中如何在不同程序間使用共享記憶體進行通訊的。

from multiprocessing import Process
import mmap
import contextlib
import time

def writer():
    with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_WRITE)) as mem:
        for share_data in ("Hello", "Alpha_Panda"):
            mem.seek(0)
            print('Write data:== %s == to share memory!' % share_data)
            mem.write(str.encode(share_data))
            mem.flush()
            time.sleep(0.5)

def reader():
    while True:
        invalid_byte, empty_byte = str.encode('\x00'), str.encode('')
        with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_READ)) as mem:
            share_data = mem.read(1024).replace(invalid_byte, empty_byte)
            if not share_data:
                """ 當共享記憶體沒有有效資料時結束reader """
                break
            print("Get data:== %s == from share memory!" % share_data.decode())
        time.sleep(0.5)


if __name__ == '__main__':
    p_reader = Process(target=reader, args=())
    p_writer = Process(target=writer, args=())
    p_writer.start()
    p_reader.start()
    p_writer.join()
    p_reader.join()

執行結果:

Write data:== Hello == to share memory!
Write data:== Alpha_Panda == to share memory!
Get data:== Hello == from share memory!
Get data:== Alpha_Panda == from share memory!

下面簡單的來說明一下共享記憶體的原理;

程序虛擬地址到實體地址的一個對映關如下:

 上面這個圖已經很明白的展示了共享記憶體的原理。

左邊是正常情況下,不同程序的線性地址空間被對映到不同的實體記憶體頁,這樣不管其他程序怎麼修改實體記憶體,都不會影響到其他程序;

右邊表示的是程序共享記憶體的情況下,不同程序的部分線性地址會被對映到同一物理頁,一個程序對這個物理頁的修改,會對另一個程序立即可見;

當然潛在的問題就是要採取程序同步措施,也就是對共享記憶體的訪問必須是互斥的。這個可以藉助訊號量來實現。

5. socket通訊

最後再來介紹一種可以跨主機的程序間通訊:socket。

懂網路程式設計的人,對這個應該都比較熟悉。socket不僅可以跨主機進行通訊,甚至有時候可以使用socket在同一主機的不同程序間進行通訊。

這部分程式碼比較簡單常見,這裡僅僅使用流程圖來表示一下socket通訊的流程及相關介面。

 上圖表示客戶端上某程序使用socket和伺服器上監聽程式進行socket通訊的一個流程。

 小結

到這裡關於常見的程序間通訊相關的概念和例項均簡單介紹了一下。希望本文能讓你對程序間通訊有一個更深入的理解和認識。

結合之前幾篇介紹執行緒、程序概念及執行緒間同步的一些措施的介紹,相信應該對執行緒和程序相關概念有一個簡單清晰的認識