1. 程式人生 > >UNIX網路程式設計:IPC之訊息佇列

UNIX網路程式設計:IPC之訊息佇列

訊息佇列:將訊息佇列按佇列的方式組織成的連結串列,每個訊息都是其中的一個節點;
注意:訊息佇列的長度及每個訊息的大小是有限制的

訊息佇列的操作函式如下:
msgget

  int msgget(key_t key,int msgflg);
  該函式作用是:建立一個訊息佇列或訪問一個已經存在的訊息佇列;成功返回標示符,出錯返回-1;
  其第一個引數是一個整數IPC鍵,由ftok函式產生,或者直接是常值IPC_PRIVATE;
  msgflg是IPC_CREAT, IPC_EXCL與讀寫許可權或的結果

msgsnd

  int msgsnd(int msqid, const void *msgptr, int msgsz,int msgflg);
  該函式的作用是:在msgget開啟訊息佇列後,往訊息佇列傳送訊息;成功返回0, 出錯返回-1;
  msqid是訊息佇列的標示符;
  msgsz指的是要傳送的訊息的長度;
  msgflg引數設定的是一些標誌位;設定當訊息佇列滿等情況出現時的處理方式,如果msgflg設定為IPC_NOWAIT,則不傳送訊息並且立即返回-1;否則傳送程序掛起等待。
  msgptr是一個指向要傳送的訊息的指標,並指向緩衝區的第一個子段應為長整形,指定訊息型別,訊息內容存放在該緩衝區的緊跟在訊息型別子段的區域;
msgptr是結構體指標,該結構體如下:

 struct msgbuf {
               long mtype;       /* message type, must be > 0 */
               char mtext[1];    /* message data */
           };

msgrcv

  int msgrcv(int msqid, void *msgptr, int msgsz, long msgtyp, int msgflg);
該函式作用是從某個訊息佇列中讀出一個訊息
引數msqid是訊息佇列標示符;
msgptr和msgsnd函式中的作用相同,指向訊息的型別子段;
msgsz是指要讀出的訊息的資料部分的大小,是msgrcv能返回的最大資料量,該長度只包含資料部分的大小,不包含型別子段大小;
msgtyp是想要讀出的訊息的型別;

msgtyp > 0 時,該函式返回的是訊息佇列中型別值為msgtyp的第一個訊息;
msgtyp = 0 時,該函式返回的是訊息佇列中的第一個訊息;
msgtyp < 0 時, 該函式返回的是訊息佇列中型別值小於或等於該型別值絕對值的訊息型別中最小的第一個訊息;

msgflg設定當訊息佇列滿等情況出現時的程序處理方式,若設定為IPC_NOWAIT時,表示不傳送訊息並立即返回-1,否則傳送程序掛起等待

msgctl

 int msgctl(int msqid, int cmd, struct msqid_ds *buf);
訊息佇列的控制函式
msqid是訊息佇列的id;
cmd是採取的控制操作,有三個值,如下所示:
  引數值   說明   
  IPC_SET   設定訊息佇列的屬性,將buf指向的結構體中的數值設定為訊息佇列的相關性   
  IPC_STAT   獲取訊息佇列的屬性資訊並儲存到buf指向的結構體中   
  IPC_RMID   移除ID為msqid的訊息佇列   
  
buf是一個結構體指標,該結構體如下:

  struct msqid_ds {
               struct ipc_perm msg_perm;     /* Ownership and permissions */
               time_t          msg_stime;    /* Time of last msgsnd(2) */
               time_t          msg_rtime;    /* Time of last msgrcv(2) */
               time_t          msg_ctime;    /* Time of last change */
               unsigned long   __msg_cbytes; /* Current number of bytes in
                                                queue (nonstandard) */
               msgqnum_t       msg_qnum;     /* Current number of messages
                                                in queue */
               msglen_t        msg_qbytes;   /* Maximum number of bytes
                                                allowed in queue */
               pid_t           msg_lspid;    /* PID of last msgsnd(2) */
               pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
           };

以下是一個用來測試訊息佇列的這幾個函式的使用的簡單的單伺服器單客戶的程式程式碼:

伺服器端的程式碼:

#include "utility.h"

#define MSGSND 100
#define MSGRCV 200

int main(int ac, char *av[])
{
    key_t msg_key = ftok(av[1], 0xFF);    //得到一個IPC鍵值
    if(msg_key == -1){
        printf("msg ftok error.\n");
        exit(1);
    }

    int msg_id = msgget(msg_key, IPC_CREAT|IPC_EXCL|0666);   //建立或開啟一個訊息佇列
    if(msg_id == -1){
        printf("msgget error.\n");
        exit(1);
    }

    Msg msg;
    while(1){
        printf("Ser:>");
        scanf("%s", msg.MsgText);  //輸入訊息的內容
        if(strncmp(msg.MsgText, "quit", 4) == 0){
            msgctl(msg_id, IPC_RMID, NULL);   //刪除訊息佇列
            break;
        }

        msg.MsgType = MSGSND; 
        msgsnd(msg_id, &msg, strlen(msg.MsgText) + 1, IPC_NOWAIT);   //傳送訊息佇列的內容

        msgrcv(msg_id, &msg, 256, MSGRCV, 0);   //接收訊息
        printf("cli:>%s\n", msg.MsgText);   //輸出訊息的內容
    }

    return 0;
}

以下是客戶端的程式碼:

#include "utility.h"

#define MSGSND 200
#define MSGRCV 100

int main(int ac, char *av[])
{
    key_t msg_key  = ftok(av[1], 0xFF);   //得到訊息佇列的鍵值
    if(msg_key == -1){
        printf("ftok error.\n");
        exit(1);
    }

    int msg_id = msgget(msg_key, 0);    //開啟一個訊息佇列
    if(msg_id == -1){
        printf("msgget error.\n");
        exit(1);
    }

    Msg msg;
    while(1){
        msgrcv(msg_id, &msg, 256, MSGRCV, 0);  //接收訊息
        printf("ser:>%s\n", msg.MsgText);

        printf("cli:");
        scanf("%s", msg.MsgText);
        msg.MsgType = MSGSND;
        if(strncmp(msg.MsgText, "quit", 4) == 0){
            msgsnd(msg_id, &msg, strlen(msg.MsgText) + 1, IPC_NOWAIT);
            break;
        }
        msgsnd(msg_id, &msg, strlen(msg.MsgText) + 1, IPC_NOWAIT);  //傳送訊息
    }
    return 0;
}

以下是utility.h標頭檔案的程式碼:

#pragma once

#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>

using namespace std;

union semun
{
    int val;
    struct semid_ds *buf;
    unsigned short *array;
    struct seminfo *__buf;
    void *__pad;
};

msg.h標頭檔案

#include "utility.h"

typedef struct Msg{
    long MsgType;
    char MsgText[256];
}Msg;

makefile檔案:

all:ser cli

ser:ser.cpp
    g++ ser.cpp -o ser

cli:cli.cpp
    g++ cli.cpp -o cli

.PHONY:clean

clean:
    rm ser cli

若要實現一個伺服器對多個客戶,並且僅使用一個訊息佇列,則可以將客戶的ID作為訊息的型別,不過這種僅使用一個IPC通道很容易造成死鎖解決方法是:客戶可以填滿訊息佇列,防止伺服器傳送應答,於是客戶阻塞在msgsnd中,伺服器也是如此;
檢測這種死鎖的一個方法是約定伺服器對訊息佇列總是非阻塞寫

實現一個伺服器對多個客戶並且每個客戶一個訊息佇列:伺服器佇列有共用的鍵,但是客戶以IPC_PRIVATE建立各自私有的佇列;每個客戶把各自的私有佇列的標示符傳遞給伺服器,伺服器把自己的應答傳送到由客戶指出的佇列中; 這種設計的潛在問題是某個客戶中途死亡,這種情況下它的死有佇列中可能永遠殘留訊息(或者至少到核心自舉或i某個使用者顯示的刪除該佇列為止)。