1. 程式人生 > >基於訊息佇列的多程序伺服器

基於訊息佇列的多程序伺服器

目錄

一、思路

1)server程序接收時, 指定msgtyp為0, 從隊首不斷接收訊息;
2)server程序傳送時, 將mtype指定為接收到的client程序的pid;
3)client程序傳送的時候, mtype指定為自己程序的pid;
4)client程序接收時, 需要將msgtyp指定為自己程序的pid, 只接收訊息型別為自己pid的訊息。

問題:

但上述程式是存在死鎖的風險的,當同時開了多個客戶端,將佇列寫滿了,此時伺服器端想要寫入就會阻塞,而因為客戶端一旦傳送了資料就阻塞等待伺服器端回射型別為pid的訊息,即佇列的訊息不會減少,此時就會形成死鎖,互相等待。非阻塞方式傳送也不行,因為佇列已滿,會發生EAGAIN錯誤。

改進:

某個客戶端先建立一個私有訊息佇列,然後將私有訊息佇列識別符號和具體資料通過共享的佇列傳送給Server,伺服器fork 出一個子程序,此時根據私有佇列識別符號就可以將資料回射到這個佇列,這個客戶端就可以從私有佇列讀取到回射的資料。

二、實現

1. 原始程式碼

server.c

/*  Server  */
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include
<errno.h>
#include <string.h> #define ERR_EXIT(m) \ do { \ perror(m); \ exit(EXIT_FAILURE); \ } while(0) #define MSGMAX 8192 struct msgbuf { long mtype; char mtext[MSGMAX]; }; void echo_ser(int msgid) { struct msgbuf msg; memset(&msg, 0, sizeof
(msg)); int nrcv ; while (1) { if ((nrcv = msgrcv(msgid, &msg, sizeof(msg.mtext), 1, 0)) < 0); //指定接受優先順序為1的(msgtyp) int pid = *((int *)msg.mtext); fputs(msg.mtext + 4, stdout); msg.mtype = pid; msgsnd(msgid, &msg, nrcv, 0); memset(&msg, 0, sizeof(msg)); } } int main(int argc, char *argv[]) { int msgid; msgid = msgget(1234, IPC_CREAT | 0666); //建立一個訊息佇列 if (msgid == -1) ERR_EXIT("msgget"); echo_ser(msgid); return 0; } //--------------------- //作者:NK_test //來源:CSDN //原文:https://blog.csdn.net/NK_test/article/details/49866113

client.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)

#define MSGMAX 8192

struct msgbuf
{
    long mtype;
    char mtext[MSGMAX];
};

void echo_cli(int msgid)
{
    int nrcv;
    int pid = getpid();
    struct msgbuf msg;
    memset(&msg, 0, sizeof(msg));
    msg.mtype = 1;
    *((int *)msg.mtext) = pid;
    while (fgets(msg.mtext + 4, MSGMAX, stdin) != NULL) //客戶端輸入資訊 
    {

        if (msgsnd(msgid, &msg, MSGMAX, IPC_NOWAIT) < 0)
            ERR_EXIT("msgsnd");

        memset(msg.mtext + 4, 0, MSGMAX - 4);
        if ((nrcv = msgrcv(msgid, &msg, MSGMAX, pid, 0)) < 0)
            ERR_EXIT("msgsnd");
        fputs(msg.mtext + 4, stdout);
        memset(msg.mtext + 4, 0, MSGMAX - 4);

    }
}

int main(int argc, char *argv[])
{

    int msgid;
    msgid = msgget(1234, 0); //開啟名為1234的訊息佇列 
    if (msgid == -1)
        ERR_EXIT("msgget");

    echo_cli(msgid);

    return 0;
}
//--------------------- 
//作者:NK_test 
//來源:CSDN 
//原文:https://blog.csdn.net/NK_test/article/details/49866113 

2. 修改

2.1 思路

在這裡插入圖片描述

2.2 程式碼

server.c

/*  Server  */
#include <stdlib.h>
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>

#define MAX 512

#define ERR_EXIT(m)         \
    do                      \
    {                       \
        perror(m);          \
        exit(EXIT_FAILURE); \
    } while (0)

struct msgbuf
{
    long mtype;
    char mtext[1];
};

void echo_ser(int pmsgid)
{
    int nrcv;
    struct msgbuf *pmsg_snd;
    struct msgbuf *pmsg_rcv;
    char text[MAX];
    
    while (1)
    {
        pmsg_rcv = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX);
        nrcv = msgrcv(pmsgid, pmsg_rcv, MAX, 0, 0);
        if (nrcv == -1)
        {
            ERR_EXIT("pmsg_rcv error");
        }

        printf("Client: %s", pmsg_rcv->mtext);
        free(pmsg_rcv);

        strcpy(text,"Received. ");
        pmsg_snd = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(text));
        pmsg_snd->mtype = 2;
        strcpy(pmsg_snd->mtext, text);
        nrcv = msgsnd(pmsgid, pmsg_snd, sizeof(text), IPC_NOWAIT);
        if (nrcv == -1)
            ERR_EXIT("pmsg_snd error");

        free(pmsg_snd);
    }
}

int main(int argc, char *argv[])
{
    struct msgbuf *msg;
    int nrcv;
    int pid;
    int pmsgid;

    //建立一個訊息佇列
    int msgid;
    msgid = msgget(1234, IPC_CREAT | 0666);
    printf("%d\n",msgid);
    if (msgid == -1)
        ERR_EXIT("msgget error");


    //忽略子程序退出的細節
    signal(SIGCHLD, SIG_IGN);
    while (1) //接收到新的連線訊息
    {
        msg = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX);
        //接受連線, mtype=1
        nrcv = msgrcv(msgid, msg, MAX, 1, 0);
        if (nrcv == -1){
            ERR_EXIT("msgrcv error");
        }
        pmsgid = *((int *)msg->mtext);
        printf("\npmsgid =  %d \n", pmsgid);
        pid = fork();
        if (pid == 0) //子程序
        {
            echo_ser(pmsgid);
        }
    }

    return 0;
}

client.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>

#define MAX 512

#define ERR_EXIT(m)         \
    do                      \
    {                       \
        perror(m);          \
        exit(EXIT_FAILURE); \
    } while (0)

struct msgbuf
{
    long mtype;
    char mtext[1];
};

void echo_cli(int pmsgid)
{
    int nrcv;
    struct msgbuf *pmsg_snd;
    struct msgbuf *pmsg_rcv;
    char text[MAX];
    printf("Enter some text: ");
    while (fgets(text, MAX, stdin) != NULL) //客戶端輸入資訊
    {
        pmsg_snd = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(text));
        pmsg_snd->mtype = 2;
        strcpy(pmsg_snd->mtext, text);
        nrcv = msgsnd(pmsgid, pmsg_snd, sizeof(text), IPC_NOWAIT);
        if (nrcv == -1)
            ERR_EXIT("pmsg_snd error");

        free(pmsg_snd);

        pmsg_rcv = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX);
        nrcv = msgrcv(pmsgid, pmsg_rcv, MAX, 0, 0);

        if (nrcv == -1)
        {
            ERR_EXIT("pmsg_rcv error");
        }
        printf("Server: %s\nEnter some text:", pmsg_rcv->mtext);
        free(pmsg_rcv);
    }
}

int main(int argc, char *argv[])
{
    int pmsgid;
    int msgid;
    int ret;
    struct msgbuf *msg;

    //建立一個私有訊息佇列
    pmsgid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
    if (pmsgid == -1)
        ERR_EXIT("msgget error");

    printf("message %d queue created!\n", pmsgid);

    //將私有訊息佇列識別符號和具體資料通過共享的佇列傳送給Server

    //開啟名為1234的訊息佇列
    msgid = msgget(1234, 0);
    if (msgid == -1)
        ERR_EXIT("msgget error");

    msg = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(pmsgid));
    msg->mtype = 1;
    *((int *)msg->mtext) = pmsgid;
    printf("%d\n", *((int *)msg->mtext));

    //傳送訊息
    ret = msgsnd(msgid, msg, sizeof(pmsgid), IPC_NOWAIT);
    if (ret == -1)
    {
        ERR_EXIT("msgsnd error");
    }
    free(msg); //傳送完畢,釋放記憶體
    printf("msg send.\n");

    echo_cli(pmsgid);

    ret = msgctl(pmsgid, IPC_RMID, NULL);
    if (ret == -1)
    {
        ERR_EXIT("del error");
    }
    return 0;
}