1. 程式人生 > >Linux程序間通訊方式

Linux程序間通訊方式

程序與程序通訊的概念

程序是作業系統的概念,每當我們執行一個程式時,對於作業系統來講就建立了一個程序,在這個過程中,伴隨著資源的分配和釋放。可以認為程序是一個程式的一次執行過程。
程序使用者空間是相互獨立的,一般而言是不能相互訪問的。但很多情況下程序間需要互相通訊,來完成系統的某項功能。程序通過與核心及其它程序之間的互相通訊來協調它們的行為。

程序通訊的應用場景

  • 資料傳輸:一個程序需要將它的資料傳送給另一個程序,傳送的資料量在一個位元組到幾兆位元組之間。

  • 共享資料:多個程序想要操作共享資料,一個程序對共享資料的修改,別的程序應該立刻看到。

  • 通知事件:一個程序需要向另一個或一組程序傳送訊息,通知它(它們)發生了某種事件(如程序終止時要通知父程序)。

  • 資源共享:多個程序之間共享同樣的資源。為了作到這一點,需要核心提供鎖和同步機制。

  • 程序控制:有些程序希望完全控制另一個程序的執行(如Debug程序),此時控制程序希望能夠攔截另一個程序的所有陷入和異常,並能夠及時知道它的狀態改變。

程序通訊的幾種方式

管道

管道簡介

管道包括三種:
- 普通管道pipe: 通常有兩種限制,一是單工,只能單向傳輸;二是隻能在父子或者兄弟程序間使用.
- 流管道s_pipe: 去除了第一種限制,為半雙工,只能在父子或兄弟程序間使用,可以雙向傳輸.
- 命名管道:name_pipe:去除了第二種限制,可以在許多並不相關的程序之間進行通訊.

管道原理

管道如何通訊

管道是由核心管理的一個緩衝區,相當於我們放入記憶體中的一個紙條。管道的一端連線一個程序的輸出。這個程序會向管道中放入資訊。管道的另一端連線一個程序的輸入,這個程序取出被放入管道的資訊。一個緩衝區不需要很大,它被設計成為環形的資料結構,以便管道可以被迴圈利用。當管道中沒有資訊的話,從管道中讀取的程序會等待,直到另一端的程序放入資訊。當管道被放滿資訊的時候,嘗試放入資訊的程序會等待,直到另一端的程序取出資訊。當兩個程序都終結的時候,管道也自動消失。
這裡寫圖片描述

管道如何建立

從原理上,管道利用fork機制建立,從而讓兩個程序可以連線到同一個PIPE上。最開始的時候,上面的兩個箭頭都連線在同一個程序Process 1上(連線在Process 1上的兩個箭頭)。當fork複製程序的時候,會將這兩個連線也複製到新的程序(Process 2)。隨後,每個程序關閉自己不需要的一個連線 (兩個黑色的箭頭被關閉; Process 1關閉從PIPE來的輸入連線,Process 2關閉輸出到PIPE的連線),這樣,剩下的紅色連線就構成了如上圖的PIPE。
這裡寫圖片描述


那麼管道建立後其對應的資料結構是什麼呢?
在 Linux 中,管道的實現並沒有使用專門的資料結構,而是藉助了檔案系統的file結構和VFS的索引節點inode。通過將兩個 file 結構指向同一個臨時的 VFS 索引節點,而這個 VFS 索引節點又指向一個物理頁面而實現的。如下圖:
管道資料結構
有兩個 file 資料結構,但它們定義檔案操作例程地址是不同的,其中一個是向管道中寫入資料的例程地址,而另一個是從管道中讀出資料的例程地址。這樣,使用者程式的系統呼叫仍然是通常的檔案操作,而核心卻利用這種抽象機制實現了管道這一特殊操作。

管道讀寫實現

管道實現的原始碼在fs/pipe.c中,在pipe.c中有很多函式,其中有兩個函式比較重要,即管道讀函式pipe_read()和管道寫函式pipe_wrtie()。管道寫函式通過將位元組複製到 VFS 索引節點指向的實體記憶體而寫入資料,而管道讀函式則通過複製實體記憶體中的位元組而讀出資料。當然,核心必須利用一定的機制同步對管道的訪問,為此,核心使用了鎖、等待佇列和訊號。

當寫程序向管道中寫入時,它利用標準的庫函式write(),系統根據庫函式傳遞的檔案描述符,可找到該檔案的 file 結構。file 結構中指定了用來進行寫操作的函式(即寫入函式)地址,於是,核心呼叫該函式完成寫操作。寫入函式在向記憶體中寫入資料之前,必須首先檢查 VFS 索引節點中的資訊,同時滿足如下條件時,才能進行實際的記憶體複製工作:

記憶體中有足夠的空間可容納所有要寫入的資料;
記憶體沒有被讀程式鎖定。
如果同時滿足上述條件,寫入函式首先鎖定記憶體,然後從寫程序的地址空間中複製資料到記憶體。否則,寫入程序就休眠在 VFS 索引節點的等待佇列中,接下來,核心將呼叫排程程式,而排程程式會選擇其他程序執行。寫入程序實際處於可中斷的等待狀態,當記憶體中有足夠的空間可以容納寫入資料,或記憶體被解鎖時,讀取程序會喚醒寫入程序,這時,寫入程序將接收到訊號。當資料寫入記憶體之後,記憶體被解鎖,而所有休眠在索引節點的讀取程序會被喚醒。

管道的讀取過程和寫入過程類似。但是,程序可以在沒有資料或記憶體被鎖定時立即返回錯誤資訊,而不是阻塞該程序,這依賴於檔案或管道的開啟模式。反之,程序可以休眠在索引節點的等待佇列中等待寫入程序寫入資料。當所有的程序完成了管道操作之後,管道的索引節點被丟棄,而共享資料頁也被釋放。

管道api與用法

普通管道

#include <unistd.h>
int pipe(int filedes[2]);
filedes[0]用於讀出資料,讀取時必須關閉寫入端,即close(filedes[1]);
filedes[1]用於寫入資料,寫入時必須關閉讀取端,即close(filedes[0])。
int main(void)
{
    int n;
    int fd[2];
    pid_t pid;
    char line[MAXLINE];

    if(pipe(fd)!0){                 /* 先建立管道得到一對檔案描述符 */
        exit(0);
    }

    if((pid = fork())!=0)            /* 父程序把檔案描述符複製給子程序 */
        exit(1);
    else if(pid > 0){                /* 父程序寫 */
        close(fd[0]);                /* 關閉讀描述符 */
        write(fd[1], "\nhello world\n", 14);
    }
    else{                            /* 子程序讀 */
        close(fd[1]);                /* 關閉寫端 */
        n = read(fd[0], line, MAXLINE);
        write(STDOUT_FILENO, line, n);
    }

    exit(0);
}

流管道

命名管道

由於基於fork機制,所以管道只能用於父程序和子程序之間,或者擁有相同祖先的兩個子程序之間 (有親緣關係的程序之間)。為了解決這一問題,Linux提供了FIFO方式連線程序。FIFO又叫做命名管道(named PIPE)。

實現原理

FIFO (First in, First out)為一種特殊的檔案型別,它在檔案系統中有對應的路徑。當一個程序以讀(r)的方式開啟該檔案,而另一個程序以寫(w)的方式開啟該檔案,那麼核心就會在這兩個程序之間建立管道,所以FIFO實際上也由核心管理,不與硬碟打交道。之所以叫FIFO,是因為管道本質上是一個先進先出的佇列資料結構,最早放入的資料被最先讀出來,從而保證資訊交流的順序。FIFO只是借用了檔案系統(file system,命名管道是一種特殊型別的檔案,因為Linux中所有事物都是檔案,它在檔案系統中以檔名的形式存在。)來為管道命名。寫模式的程序向FIFO檔案中寫入,而讀模式的程序從FIFO檔案中讀出。當刪除FIFO檔案時,管道連線也隨之消失。FIFO的好處在於我們可以通過檔案的路徑來識別管道,從而讓沒有親緣關係的程序之間建立連線。

api與應用
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *filename, mode_t mode);
該函式的第一個引數是一個普通的路徑名,也就是建立 後FIFO的名字。第二個引數與開啟普通檔案的open()函式中的mode 引數相同。 如果mkfifo的第一個引數是一個已經存在的路徑名時,會返回EEXIST錯誤,所以一般典型的呼叫程式碼首先會檢查是否返回該錯誤,如果確實返回該錯 誤,那麼只要呼叫開啟FIFO的函式就可以了。一般檔案的I/O函式都可以用於FIFO,如close、read、write等等。
int mknode(const char *filename, mode_t mode | S_IFIFO, (dev_t) 0 );
其中filename是被建立的檔名稱,mode表示將在該檔案上設定的許可權位和將被建立的檔案型別(在此情況下為S_IFIFO),dev是當建立裝置特殊檔案時使用的一個值。因此,對於先進先出檔案它的值為0

程式例項:

#include<sys/types.h>  
#include<sys/stat.h>  
#include<fcntl.h>  
char *FIFO = "/tmp/my_fifo";  
main()  
{  
char buffer[80];  
int fd;  
unlink(FIFO);  
mkfifo(FIFO,0666);  
if(fork()>0){  
char s[ ] = “hello!\n”;  
fd = open (FIFO,O_WRONLY);  
write(fd,s,sizeof(s));  
close(fd);  
}  
else{  
fd= open(FIFO,O_RDONLY);  
read(fd,buffer,80);  
printf(“%s”,buffer);  
close(fd);  
}  
}  

匿名管道和有名管道總結

(1)管道是特殊型別的檔案,在滿足先入先出的原則條件下可以進行讀寫,但不能進行定位讀寫。
(2)匿名管道是單向的,只能在有親緣關係的程序間通訊;有名管道以磁碟檔案的方式存在,可以實現本機任意兩個程序通訊。
(3)無名管道阻塞問題:無名管道無需顯示開啟,建立時直接返回檔案描述符,在讀寫時需要確定對方的存在,否則將退出。如果當前程序向無名管道的一端寫資料,必須確定另一端有某一程序。如果寫入無名管道的資料超過其最大值,寫操作將阻塞,如果管道中沒有資料,讀操作將阻塞,如果管道發現另一端斷開,將自動退出。
(4)有名管道阻塞問題:有名管道在開啟時需要確實對方的存在,否則將阻塞。即以讀方式開啟某管道,在此之前必須一個程序以寫方式開啟管道,否則阻塞。此外,可以以讀寫(O_RDWR)模式開啟有名管道,即當前程序讀,當前程序寫,不會阻塞。

訊號

訊號是比較複雜的通訊方式,用於通知接受程序有某種事件發生,除了用於程序間通訊外,程序還可以傳送訊號給程序本身;linux除了支援Unix早期訊號語義函式sigal外,還支援語義符合Posix.1標準的訊號函式sigaction(實際上,該函式是基於BSD的,BSD為了實現可靠訊號機制,又能夠統一對外介面,用sigaction函式重新實現了signal函式)。

  • 訊號是Linux系統中用於程序間互相通訊或者操作的一種機制,訊號可以在任何時候發給某一程序,而無需知道該程序的狀態。
  • 如果該程序當前並未處於執行狀態,則該訊號就有核心儲存起來,直到該程序恢復執行並傳遞給它為止。
  • 如果一個訊號被程序設定為阻塞,則該訊號的傳遞被延遲,直到其阻塞被取消時訊號才被傳遞給程序。

    Linux系統中常用訊號:
    (1)SIGHUP:使用者從終端登出,所有已啟動程序都將收到該程序。系統預設狀態下對該訊號的處理是終止程序。
    (2)SIGINT:程式終止訊號。程式執行過程中,按Ctrl+C鍵將產生該訊號。
    (3)SIGQUIT:程式退出訊號。程式執行過程中,按Ctrl+\\鍵將產生該訊號。
    (4)SIGBUS和SIGSEGV:程序訪問非法地址。
    (5)SIGFPE:運算中出現致命錯誤,如除零操作、資料溢位等。
    (6)SIGKILL:使用者終止程序執行訊號。shell下執行kill -9傳送該訊號。
    (7)SIGTERM:結束程序訊號。shell下執行kill 程序pid傳送該訊號。
    (8)SIGALRM:定時器訊號。
    (9)SIGCLD:子程序退出訊號。如果其父程序沒有忽略該訊號也沒有處理該訊號,則子程序退出後將形成殭屍程序。

訊號來源

訊號是軟體層次上對中斷機制的一種模擬,是一種非同步通訊方式,訊號可以在使用者空間程序和核心之間直接互動,核心可以利用訊號來通知使用者空間的程序發生了哪些系統事件,訊號事件主要有兩個來源:
- 硬體來源:使用者按鍵輸入Ctrl+C退出、硬體異常如無效的儲存訪問等。
- 軟體來源:終止程序訊號、其他程序呼叫kill函式、軟體異常產生訊號。

訊號生命週期和處理流程

(1)訊號被某個程序產生,並設定此訊號傳遞的物件(一般為對應程序的pid),然後傳遞給作業系統;
(2)作業系統根據接收程序的設定(是否阻塞)而選擇性的傳送給接收者,如果接收者阻塞該訊號(且該訊號是可以阻塞的),作業系統將暫時保留該訊號,而不傳遞,直到該程序解除了對此訊號的阻塞(如果對應程序已經退出,則丟棄此訊號),如果對應程序沒有阻塞,作業系統將傳遞此訊號。
(3)目的程序接收到此訊號後,將根據當前程序對此訊號設定的預處理方式,暫時終止當前程式碼的執行,保護上下文(主要包括臨時暫存器資料,當前程式位置以及當前CPU的狀態)、轉而執行中斷服務程式,執行完成後再恢復到中斷的位置。當然,對於搶佔式核心,在中斷返回時還將引發新的排程。

api使用

訊息佇列

訊息佇列是Linux IPC中很常用的一種通訊方式,它通常用來在不同程序間傳送特定格式的訊息資料。
訊息佇列和之前討論過的管道和FIFO有很大的區別,主要有以下兩點:
一個程序向訊息佇列寫入訊息之前,並不需要某個程序在該佇列上等待該訊息的到達,而管道和FIFO是相反的,程序向其中寫訊息時,管道和FIFO必需已經開啟來讀,那麼核心會產生SIGPIPE訊號。
IPC的持續性不同。管道和FIFO是隨程序的持續性,當管道和FIFO最後一次關閉發生時,仍在管道和FIFO中的資料會被丟棄。訊息佇列是隨核心的持續性,即一個程序向訊息佇列寫入訊息後,然後終止,另外一個程序可以在以後某個時刻開啟該佇列讀取訊息。只要核心沒有重新自舉,訊息佇列沒有被刪除。
訊息佇列中的每條訊息通常具有以下屬性:
- 一個表示優先順序的整數;
- 訊息的資料部分的長度;
- 訊息資料本身

實現原理

api與應用(以Posix為例)

  • POSIX訊息佇列的建立和關閉
    POSIX訊息佇列的建立,關閉和刪除用到以下三個函式介面:
#include <mqueue.h>  
mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */);  
                       //成功返回訊息佇列描述符,失敗返回-1  
mqd_t mq_close(mqd_t mqdes);  
mqd_t mq_unlink(const char *name);  
                           //成功返回0,失敗返回-1  
mq_open用於開啟或建立一個訊息佇列。
name:表示訊息佇列的名字,它符合POSIX IPC的名字規則。
oflag:表示開啟的方式,和open函式的類似。有必須的選項:O_RDONLY,O_WRONLY,O_RDWR,還有可選的選項:O_NONBLOCK,O_CREAT,O_EXCL。
mode:是一個可選引數,在oflag中含有O_CREAT標誌且訊息佇列不存在時,才需要提供該引數。表示預設訪問許可權。可以參考open。
attr:也是一個可選引數,在oflag中含有O_CREAT標誌且訊息佇列不存在時才需要。該引數用於給新佇列設定某些屬性,如果是空指標,那麼就採用預設屬性。
mq_open返回值是mqd_t型別的值,被稱為訊息佇列描述符。在Linux 2.6.18中該型別的定義為整型:
#include <bits/mqueue.h>  
typedef int mqd_t;  
mq_close用於關閉一個訊息佇列,和檔案的close型別,關閉後,訊息佇列並不從系統中刪除。一個程序結束,會自動呼叫關閉開啟著的訊息佇列。
mq_unlink用於刪除一個訊息佇列。訊息佇列建立後只有通過呼叫該函式或者是核心自舉才能進行刪除。每個訊息佇列都有一個儲存當前開啟著描述符數的引用計數器,和檔案一樣,因此本函式能夠實現類似於unlink函式刪除一個檔案的機制。
  • POSIX訊息佇列的屬性
    POSIX標準規定訊息佇列屬性mq_attr必須要含有以下四個內容:
long    mq_flags //訊息佇列的標誌:0或O_NONBLOCK,用來表示是否阻塞   
long    mq_maxmsg  //訊息佇列的最大訊息數  
long    mq_msgsize  //訊息佇列中每個訊息的最大位元組數  
long    mq_curmsgs  //訊息佇列中當前的訊息數目  
在Linux 2.6.18中mq_attr結構的定義如下:
#include <bits/mqueue.h>  
struct mq_attr  
{  
  long int mq_flags;      /* Message queue flags.  */  
  long int mq_maxmsg;   /* Maximum number of messages.  */  
  long int mq_msgsize;   /* Maximum message size.  */  
  long int mq_curmsgs;   /* Number of messages currently queued.  */  
  long int __pad[4];  
};  
POSIX訊息佇列的屬性設定和獲取可以通過下面兩個函式實現:
#include <mqueue.h>  
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);  
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);  
                               //成功返回0,失敗返回-1  
mq_getattr用於獲取當前訊息佇列的屬性,mq_setattr用於設定當前訊息佇列的屬性。其中mq_setattr中的oldattr用於儲存修改前的訊息佇列的屬性,可以為空。
mq_setattr可以設定的屬性只有mq_flags,用來設定或清除訊息佇列的非阻塞標誌。newattr結構的其他屬性被忽略。mq_maxmsg和mq_msgsize屬性只能在建立訊息佇列時通過mq_open來設定。mq_open只會設定該兩個屬性,忽略另外兩個屬性。mq_curmsgs屬性只能被獲取而不能被設定。

下面是測試程式碼:

#include <iostream>  
#include <cstring>  

#include <errno.h>  
#include <unistd.h>  
#include <fcntl.h>  
#include <mqueue.h>  

using namespace std;  

int main()  
{  
    mqd_t mqID;  
    mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);  

    if (mqID < 0)  
    {  
        cout<<"open message queue error..."<<strerror(errno)<<endl;  
        return -1;  
    }  

    mq_attr mqAttr;  
    if (mq_getattr(mqID, &mqAttr) < 0)  
    {  
        cout<<"get the message queue attribute error"<<endl;  
        return -1;  
    }  

    cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;  
    cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;  
    cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;  
    cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;  
}  
在Linux 2.6.18中執行結果是:
mq_flags:0  
mq_maxmsg:10  
mq_msgsize:8192  
mq_curmsgs:0  
  • POSIX訊息佇列的使用

POSIX訊息佇列可以通過以下兩個函式來進行傳送和接收訊息:

#include <mqueue.h>  
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr,  
                      size_t msg_len, unsigned msg_prio);  
                     //成功返回0,出錯返回-1  

mqd_t mq_receive(mqd_t mqdes, char *msg_ptr,  
                      size_t msg_len, unsigned *msg_prio);  
                     //成功返回接收到訊息的位元組數,出錯返回-1  

#ifdef __USE_XOPEN2K  
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr,  
                      size_t msg_len, unsigned msg_prio,  
                      const struct timespec *abs_timeout);  

mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,  
                      size_t msg_len, unsigned *msg_prio,  
                      const struct timespec *abs_timeout);  
#endif  
mq_send向訊息佇列中寫入一條訊息,mq_receive從訊息佇列中讀取一條訊息。
mqdes:訊息佇列描述符;
msg_ptr:指向訊息體緩衝區的指標;
msg_len:訊息體的長度,其中mq_receive的該引數不能小於能寫入佇列中訊息的最大大小,即一定要大於等於該佇列的mq_attr結構中mq_msgsize的大小。如果mq_receive中的msg_len小於該值,就會返回EMSGSIZE錯誤。POXIS訊息佇列傳送的訊息長度可以為0。
msg_prio:訊息的優先順序;它是一個小於MQ_PRIO_MAX的數,數值越大,優先順序越高。POSIX訊息佇列在呼叫mq_receive時總是返回佇列中最高優先順序的最早訊息。如果訊息不需要設定優先順序,那麼可以在mq_send是置msg_prio為0,mq_receive的msg_prio置為NULL。
還有兩個XSI定義的擴充套件介面限時傳送和接收訊息的函式:mq_timedsend和mq_timedreceive函式。預設情況下mq_send和mq_receive是阻塞進行呼叫,可以通過mq_setattr來設定為O_NONBLOCK。

下面是訊息佇列使用的測試程式碼:

#include <iostream>  
#include <cstring>  
#include <errno.h>  

#include <unistd.h>  
#include <fcntl.h>  
#include <mqueue.h>  

using namespace std;  

int main()  
{  
    mqd_t mqID;  
    mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, NULL);  

    if (mqID < 0)  
    {  
        if (errno == EEXIST)  
        {  
            mq_unlink("/anonymQueue");  
            mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);  
        }  
        else  
        {  
            cout<<"open message queue error..."<<strerror(errno)<<endl;  
            return -1;  
        }  
    }  

    if (fork() == 0)  
    {  
        mq_attr mqAttr;  
        mq_getattr(mqID, &mqAttr);  

        char *buf = new char[mqAttr.mq_msgsize];  

        for (int i = 1; i <= 5; ++i)  
        {  
            if (mq_receive(mqID, buf, mqAttr.mq_msgsize, NULL) < 0)  
            {  
                cout<<"receive message  failed. ";  
                cout<<"error info:"<<strerror(errno)<<endl;  
                continue;  
            }  

            cout<<"receive message "<<i<<": "<<buf<<endl;     
        }  
        exit(0);  
    }  

    char msg[] = "yuki";  
    for (int i = 1; i <= 5; ++i)  
    {  
        if (mq_send(mqID, msg, sizeof(msg), i) < 0)  
        {  
            cout<<"send message "<<i<<" failed. ";  
            cout<<"error info:"<<strerror(errno)<<endl;  
        }  
        cout<<"send message "<<i<<" success. "<<endl;     

        sleep(1);  
    }  
}                       
在Linux 2.6.18下的執行結構如下:
send message 1 success.   
receive message 1: yuki  
send message 2 success.   
receive message 2: yuki  
send message 3 success.   
receive message 3: yuki  
send message 4 success.   
receive message 4: yuki  
send message 5 success.   
receive message 5: yuki  
  • POSIX訊息佇列的限制
    POSIX訊息佇列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分別用於限定訊息佇列中的最大訊息數和每個訊息的最大位元組數。在前面已經說過了,這兩個引數可以在呼叫mq_open建立一個訊息佇列的時候設定。當這個設定是受到系統核心限制的。
    下面是在Linux 2.6.18下shell對啟動程序的POSIX訊息佇列大小的限制:
# ulimit -a |grep message  
POSIX message queues     (bytes, -q) 819200 

限制大小為800KB,該大小是整個訊息佇列的大小,不僅僅是最大訊息數*訊息的最大大小;還包括訊息佇列的額外開銷。前面我們知道Linux 2.6.18下POSIX訊息佇列預設的最大訊息數和訊息的最大大小分別為:

mq_maxmsg = 10  
mq_msgsize = 8192

為了說明上面的限制大小包括訊息佇列的額外開銷,下面是測試程式碼:

#include <iostream>  
#include <cstring>  
#include <errno.h>  

#include <unistd.h>  
#include <fcntl.h>  
#include <mqueue.h>  

using namespace std;  

int main(int argc, char **argv)  
{  
    mqd_t mqID;  
    mq_attr attr;  
    attr.mq_maxmsg = atoi(argv[1]);  
    attr.mq_msgsize = atoi(argv[2]);  

    mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, &attr);  

    if (mqID < 0)  
    {  
        if (errno == EEXIST)  
        {  
            mq_unlink("/anonymQueue");  
            mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, &attr);  

            if(mqID < 0)  
            {  
                cout<<"open message queue error..."<<strerror(errno)<<endl;  
                return -1;  
            }  
        }  
        else  
        {  
            cout<<"open message queue error..."<<strerror(errno)<<endl;  
            return -1;  
        }  
    }  

    mq_attr mqAttr;  
    if (mq_getattr(mqID, &mqAttr) < 0)  
    {  
        cout<<"get the message queue attribute error"<<endl;  
        return -1;  
    }  

    cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;  
    cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;  
    cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;  
    cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;   
}  

下面進行建立訊息佇列時設定最大訊息數和訊息的最大大小進行測試:

[root@idcserver program]# g++ -g test.cpp -lrt  
[root@idcserver program]# ./a.out 10 81920  
open message queue error...Cannot allocate memory  
[root@idcserver program]# ./a.out 10 80000  
open message queue error...Cannot allocate memory  
[root@idcserver program]# ./a.out 10 70000  
open message queue error...Cannot allocate memory  
[root@idcserver program]# ./a.out 10 60000  
mq_flags:0  
mq_maxmsg:10  
mq_msgsize:60000  
mq_curmsgs:0  

從上面可以看出訊息佇列真正存放訊息資料的大小是沒有819200B的。可以通過修改該限制引數,來改變訊息佇列的所能容納訊息的數量。可以通過下面方式來修改限制,但這會在shell啟動程序結束後失效,可以將設定寫入開機啟動的指令碼中執行,例如.bashrc,rc.local。

[root@idcserver ~]# ulimit -q 1024000000  
[root@idcserver ~]# ulimit -a |grep message  
POSIX message queues     (bytes, -q) 1024000000  

下面再次測試可以設定的訊息佇列的屬性。

[[email protected] program]# ./a.out 10 81920  
mq_flags:0  
mq_maxmsg:10  
mq_msgsize:81920  
mq_curmsgs:0  
[[email protected] program]# ./a.out 10 819200  
mq_flags:0  
mq_maxmsg:10  
mq_msgsize:819200  
mq_curmsgs:0  
[[email protected] program]# ./a.out 1000 8192    
mq_flags:0  
mq_maxmsg:1000  
mq_msgsize:8192  
mq_curmsgs:0  

POSIX訊息佇列在實現上還有另外兩個限制:
MQ_OPEN_MAX:一個程序能同時開啟的訊息佇列的最大數目,POSIX要求至少為8;
MQ_PRIO_MAX:訊息的最大優先順序,POSIX要求至少為32;

共享記憶體

實現原理

管道,FIFO,訊息佇列,他們的共同特點就是通過核心來進行通訊(假設POSIX訊息佇列也是在核心中實現的,因為POSIX標準並沒有限定它的實現方式)。向管道,FIFO,訊息佇列寫入資料需要把資料從程序複製到核心,從這些IPC讀取資料的時候又需要把資料從核心複製到程序。所以這種IPC方式往往需要2次在程序和核心之間進行資料的複製,即程序間的通訊必須藉助核心來傳遞。如下圖所示:
核心IPC通訊
共享記憶體也是一種IPC,它是目前可用IPC中最快的,它是使用方式是將同一個記憶體區對映到共享它的不同程序的地址空間中,這樣這些程序間的通訊就不再需要通過核心,只需對該共享的記憶體區域程序操作就可以了,和其他IPC不同的是,共享記憶體的使用需要使用者自己進行同步操作。下圖是共享記憶體區IPC的通訊:
共享記憶體IPC通訊

api和應用

Linux下有三種共享記憶體的IPC技術:System V共享記憶體、共享檔案對映(mmap)、POSIX共享記憶體。

system V共享記憶體

共享檔案對映

mmap函式主要的功能就是將檔案或裝置對映到呼叫程序的地址空間中,當使用mmap對映檔案到程序後,就可以直接操作這段虛擬地址進行檔案的讀寫等操作,不必再呼叫read,write等系統呼叫。在很大程度上提高了系統的效率和程式碼的簡潔性。
使用mmap函式的主要目的是:
- 對普通檔案提供記憶體對映I/O,可以提供無親緣程序間的通訊;
- 提供匿名記憶體對映,以供親緣程序間進行通訊。
- 對shm_open建立的POSIX共享記憶體區物件程序記憶體對映,以供無親緣程序間進行通訊。
下面是mmap函式的介面以及說明:

#include <sys/mman.h>  
void *mmap(void *start, size_t len, int prot, int flags, int fd, off_t offset);  
               //成功返回對映到程序地址空間的起始地址,失敗返回MAP_FAILED  
start:指定描述符fd應被對映到的程序地址空間內的起始地址,它通常被設定為空指標NULL,這告訴核心自動選擇起始地址,該函式的返回值即為fd對映到記憶體區的起始地址。
len:對映到程序地址空間的位元組數,它從被對映檔案開頭的第offset個位元組處開始,offset通常被設定為0。

prot:記憶體對映區的保護由該引數來設定,通常由以下幾個值組合而成:
PROT_READ:資料可讀;
 PROT_WRITE:資料可寫;
 PROT_EXEC:資料可執行;
 PROT_NONE:資料不可訪問;
flags:設定記憶體對映區的型別標誌,POSIX標誌定義了以下三個標誌:
MAP_SHARED:該標誌表示,呼叫程序對被對映記憶體區的資料所做的修改對於共享該記憶體區的所有程序都可見,而且確實改變其底層的支撐物件(一個檔案物件或是一個共享記憶體區物件)。
 MAP_PRIVATE:呼叫程序對被對映記憶體區的資料所做的修改只對該程序可見,而不改變其底層支撐物件。
 MAP_FIXED:該標誌表示準確的解釋start引數,一般不建議使用該標誌,對於可移植的程式碼,應該把start引數置為NULL,且不指定MAP_FIXED標誌。
上面三個標誌是在POSIX.1-2001標準中定義的,其中MAP_SHARED和MAP_PRIVATE必須選擇一個。在Linux中也定義了一些非標準的標誌,例如MAP_ANONYMOUS(MAP_ANON),MAP_LOCKED等,具體參考Linux手冊。
fd:有效的檔案描述符。如果設定了MAP_ANONYMOUS(MAP_ANON)標誌,在Linux下面會忽略fd引數,而有的系統實現如BSD需要置fd為-1offset:相對檔案的起始偏移。

mmap成功後,可以關閉fd,一般也是這麼做的,這對該記憶體對映沒有任何影響。
從程序的地址空間中刪除一個對映關係,需要用到下面的函式:

#include <sys/mman.h>  
int munmap(void *start, size_t len);  
                           //成功返回0,出錯返回-1  
start:被對映到的程序地址空間的記憶體區的起始地址,即mmap返回的地址。
len:對映區的大小。

對於一個MAP_SHARED的記憶體對映區,核心的虛擬記憶體演算法會保持記憶體對映檔案和記憶體對映區的同步,也就是說,對於記憶體對映檔案所對應記憶體對映區的修改,核心會在稍後的某個時刻更新該記憶體對映檔案。如果我們希望硬碟上的檔案內容和記憶體對映區中的內容實時一致,那麼我們就可以呼叫msync開執行這種同步:

#include <sys/mman.h>  
int msync(void *start, size_t len, int flags);  //成功返回0,出錯返回-1  
start:被對映到的程序地址空間的記憶體區的起始地址,即mmap返回的地址。
len:對映區的大小。
flags:同步標誌,有一下三個標誌:
MS_ASYNC:非同步寫,一旦寫操作由核心排入佇列,就立刻返回;
MS_SYNC:同步寫,要等到寫操作完成後才返回。
MS_INVALIDATE:使該檔案的其他記憶體對映的副本全部失效。
  • mmap記憶體對映區的大小
    Linux下的記憶體是採用頁式管理機制。通過mmap進行記憶體對映,核心生成的對映區的大小都是以頁面大小PAGESIZE為單位,即為PAGESIZE的整數倍。如果mmap對映的長度不是頁面大小的整數倍,那麼多餘空間也會被閒置浪費。
  • mmap實現程序間通訊
    mmap本身提供的程序間通訊的兩種方式,分別用於無親緣和親緣程序間的通訊。
    (1)通過匿名記憶體對映提供親緣程序間的通訊
    我們可以通過在父程序fork之前指定MAP_SHARED呼叫mmap,通過對映一個檔案來實現父子程序間的通訊,POSIX保證了父程序的記憶體對映關係保留到子程序中,父子程序對記憶體對映區的修改雙方都可以看到。
    在Linux 2.4以後,mmap提供匿名記憶體對映機制,即將mmap的flags引數指定為:MAP_SHARED | MAP_ANON。這樣就徹底避免了記憶體對映檔案的建立和開啟,簡化了對檔案的操作。匿名記憶體對映機制的目的就是為了提供一個穿越父子程序間的記憶體對映區,很方便的提供了親緣程序間的通訊,下面是測試程式碼:
#include <iostream>  
#include <cstring>  
#include <cerrno>  

#include <unistd.h>  
#include <fcntl.h>  
#include <sys/mman.h>  

using namespace std;  

int main(int argc, char **argv)  
{  
    int *memPtr;  

    memPtr = (int *) mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, 0, 0);  
    if (memPtr == MAP_FAILED)  
    {  
        cout<<"mmap failed..."<<strerror(errno)<<endl;  
        return -1;  
    }  

    *memPtr = 0;  

    if (fork() == 0)  
    {  
        *memPtr = 1;  
        cout<<"child:set memory "<<*memPtr<<endl;  

        exit(0);  
    }  

    sleep(1);  
    cout<<"parent:memory value "<<*memPtr<<endl;  

    return 0;  
}  
執行結果如下:
child:set memory 1  
parent:memory value 1  

(2)通過記憶體對映檔案提供無親緣程序間的通訊
通過在不同程序間對同一記憶體對映檔案進行對映,來進行無親緣程序間的通訊,如下測試程式碼:

//process 1  
#include <iostream>  
#include <cstring>  
#include <errno.h>  

#include <unistd.h>  
#include <fcntl.h>  
#include <sys/mman.h>  

using namespace std;  

#define  PATH_NAME "/tmp/memmap"  

int main()  
{  
    int *memPtr;  
    int fd;  

    fd = open(PATH_NAME, O_RDWR | O_CREAT, 0666);  
    if (fd < 0)  
    {  
        cout<<"open file "<<PATH_NAME<<" failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  

    ftruncate(fd, sizeof(int));  

    memPtr = (int *)mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);  
    close(fd);  

    if (memPtr == MAP_FAILED)  
    {  
        cout<<"mmap failed..."<<strerror(errno)<<endl;  
        return -1;  
    }  

    *memPtr = 111;  
    cout<<"process:"<<getpid()<<" send:"<<*memPtr<<endl;  

    return 0;  
}  

//process 2  
#include <iostream>  
#include <cstring>  
#include <errno.h>  

#include <unistd.h>  
#include <fcntl.h>  
#include <sys/mman.h>  

using namespace std;  

#define  PATH_NAME "/tmp/memmap"  

int main()  
{  
    int *memPtr;  
    int fd;  

    fd = open(PATH_NAME, O_RDWR | O_CREAT, 0666);  
    if (fd < 0)  
    {  
        cout<<"open file "<<PATH_NAME<<" failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  

    memPtr = (int *)mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);  
    close(fd);  

    if (memPtr == MAP_FAILED)  
    {  
        cout<<"mmap failed..."<<strerror(errno)<<endl;  
        return -1;  
    }  

    cout<<"process:"<<getpid()<<" receive:"<<*memPtr<<endl;  

    return 0;  
}  
執行結果如下:
# ./send   
process:12711 send:111  
# ./recv   
process:12712 receive:111  

上面的程式碼都沒進行同步操作,在實際的使用過程要考慮到程序間的同步,通常會用訊號量來進行共享記憶體的同步。

posix共享記憶體

上面介紹了通過記憶體對映檔案進行程序間的通訊的方式,現在要介紹的是通過POSIX共享記憶體區物件進行程序間的通訊。POSIX共享記憶體使用方法有以下兩個步驟:
1)通過shm_open建立或開啟一個POSIX共享記憶體物件;
2)然後呼叫mmap將它對映到當前程序的地址空間;
和通過記憶體對映檔案進行通訊的使用上差別在於mmap描述符引數獲取方式不一樣,前者通過open而後者通過shm_open。
POSIX共享記憶體區物件的特殊操作函式就只有建立(開啟)和刪除兩個函式,其他對共享記憶體區物件的操作都是通過已有的函式進行的。

#include <sys/mman.h>  
int shm_open(const char *name, int oflag, mode_t mode);  
                              //成功返回非負的描述符,失敗返回-1  
int shm_unlink(const char *name);  
                              //成功返回0,失敗返回-1  
shm_open用於建立一個新的共享記憶體區物件或開啟一個已經存在的共享記憶體區物件。
name:POSIX IPC的名字,前面關於POSIX程序間通訊都已講過關於POSIX IPC的規則,這裡不再贅述。
oflag:操作標誌,包含:O_RDONLY,O_RDWR,O_CREAT,O_EXCL,O_TRUNC。其中O_RDONLY和O_RDWR標誌必須且僅能存在一項。
mode:用於設定建立的共享記憶體區物件的許可權屬性。和open以及其他POSIX IPC的xxx_open函式不同的是,該引數必須一直存在,如果oflag引數中沒有O_CREAT標誌,該位可以置0;
shm_unlink用於刪除一個共享記憶體區物件,跟其他檔案的unlink以及其他POSIX IPC的刪除操作一樣,物件的析構會到對該物件的所有引用全部關閉才會發生。

POSIX共享記憶體和POSIX訊息佇列,有名訊號量一樣都是具有隨核心持續性[^1]的特點。
下面是通過POSIX共享記憶體進行通訊的測試程式碼,程式碼中通過POSIX訊號量來進行程序間的同步操作。

//process 1  
#include <iostream>  
#include <cstring>  
#include <errno.h>  

#include <unistd.h>  
#include <fcntl.h>  
#include <semaphore.h>  
#include <sys/mman.h>  

using namespace std;  

#define SHM_NAME "/memmap"  
#define SHM_NAME_SEM "/memmap_sem"   

char sharedMem[10];  

int main()  
{  
    int fd;  
    sem_t *sem;  

    fd = shm_open(SHM_NAME, O_RDWR | O_CREAT, 0666);  
    sem = sem_open(SHM_NAME_SEM, O_CREAT, 0666, 0);  

    if (fd < 0 || sem == SEM_FAILED)  
    {  
        cout<<"shm_open or sem_open failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  

    ftruncate(fd, sizeof(sharedMem));  

    char *memPtr;  
    memPtr = (char *)mmap(NULL, sizeof(sharedMem), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);  
    close(fd);  

    char msg[] = "yuki...";  

    memmove(memPtr, msg, sizeof(msg));  
    cout<<"process:"<<getpid()<<" send:"<<memPtr<<endl;  

    sem_post(sem);  
    sem_close(sem);  

    return 0;  
}  

//process 2  
#include <iostream>  
#include <cstring>  
#include <errno.h>  

#include <unistd.h>  
#include <fcntl.h>  
#include <semaphore.h>  
#include <sys/mman.h>  

using namespace std;  

#define SHM_NAME "/memmap"  
#define SHM_NAME_SEM "/memmap_sem"   

int main()  
{  
    int fd;  
    sem_t *sem;  

    fd = shm_open(SHM_NAME, O_RDWR, 0);  
    sem = sem_open(SHM_NAME_SEM, 0);  

    if (fd < 0 || sem == SEM_FAILED)  
    {  
        cout