1. 程式人生 > >實現訊號量(三) 訊息佇列實現訊號量

實現訊號量(三) 訊息佇列實現訊號量

        前一篇使用管道實現訊號量,本文使用訊息佇列實現訊號量。其原理和管道一樣,都是通過在訊息佇列裡面寫入一個字元,讀取一個字元。這裡就不再多說了,直接上程式碼。

        msg_sem.hpp 檔案

#ifndef MSG_SEM_HPP
#define MSG_SEM_HPP

#include<errno.h>
#include  <sys/ipc.h>
#include  <sys/msg.h>
#include  <unistd.h>

typedef struct msg_sem_tag
{
    int fd;
    int valid;
}msg_sem_t;


#define MSG_SEM_VALID 0xae3c

int msg_sem_init(msg_sem_t* msg, int num);
int msg_sem_p(msg_sem_t* msg);
int msg_sem_tryP(msg_sem_t* msg);


inline int msg_sem_v(msg_sem_t* msg)
{
    if( msg == NULL || msg->valid != MSG_SEM_VALID )
        return EINVAL;

    return msgsnd(msg->fd, "v", 1, 0);
}


inline int msg_sem_destroy(msg_sem_t* msg)
{
    if( msg == NULL || msg->valid != MSG_SEM_VALID )
        return EINVAL;

    return msgctl(msg->fd, IPC_RMID, NULL);
}

#endif // MSG_SEM_HPP
        msg_sem.cpp 檔案
#include"msg_sem.hpp"


//同樣,本函式也不是執行緒安全的。
int msg_sem_init(msg_sem_t* msg, int num)
{
    int status;

    if( msg == NULL || num < 0)
        return EINVAL;

    status = msgget(IPC_PRIVATE, 0600 | IPC_CREAT );
    if( status == -1 )
        return errno;

    msg->fd = status;

    while( num-- )
    {
        status = msgsnd(msg->fd, "v", 1, 0);
        if( status == -1 )
            goto error;
    }

    msg->valid = MSG_SEM_VALID;
    return 0;

    error:
        msgctl(msg->fd, IPC_RMID, NULL);
        return status;
}



int msg_sem_p(msg_sem_t* msg)
{
    static char ch;
    int status;


    if( msg == NULL || msg->valid != MSG_SEM_VALID )
        return EINVAL;

    while(  (status = msgrcv(msg->fd, &ch, 1, 0, 0)) == -1 )
    {
        if( errno == EINTR )
            continue;
        else
            break;
    }

    if( status == 0 )
        return 0;
    else
        return errno;
}


//由於訊息佇列可以通過引數來選擇是不是在讀取的時候阻塞,所以無需像管道那樣
//修改檔案描述符的狀態
int msg_sem_tryP(msg_sem_t* msg)
{
    static char ch;
    int status;

    if( msg == NULL || msg->valid != MSG_SEM_VALID )
        return EINVAL;

    status = msgrcv(msg->fd, &ch, 1, 0, IPC_NOWAIT);
    if( status == -1 && errno == ENOMSG )
        return EAGAIN;

    return status;
}




        測試程式碼和管道的也是差不多,只是把pipe改成msg而已。
#include "msg_sem.hpp"
#include"Thread.hpp"
#include  <stdio.h>
#include  <string.h>
#include  <sys/types.h>
#include  <fcntl.h>
#include  <stdlib.h>
#include  <unistd.h>
#include  <errno.h>


#define	NBUFF	 8
#define BUFFSIZE 4096


struct {	/* data shared by producer and consumer */
  struct {
    char	data[BUFFSIZE];			/* a buffer */
    ssize_t	n;						/* count of #bytes in the buffer */
  } buff[NBUFF];					/* NBUFF of these buffers/counts */
  msg_sem_t nempty, nfull;		/* semaphores, not pointers */
  msg_sem_t writer_mutex, reader_mutex;
} shared;

int writer_index = 0, reader_index = 0;

int		fd;							/* input file to copy to stdout */
void* produce(void *), *consume(void *);
void* produce_tryP(void *arg);


int main(int argc, char **argv)
{
    Thread_t tid_produce1, tid_produce2, tid_produce3;
    Thread_t tid_consume1, tid_consume2;

    if (argc != 2)
    {
        printf("use <pathname> as pramater \n");
        exit(1);
    }

    fd = open(argv[1], O_RDONLY);
    if( fd == -1 )
    {
        printf("cann't open the file\n");
        return -1;
    }


    msg_sem_init(&shared.writer_mutex, 1);
    msg_sem_init(&shared.reader_mutex, 1);
    msg_sem_init(&shared.nempty, NBUFF);
    msg_sem_init(&shared.nfull, 0);

    thread_init(&tid_produce1);
    thread_init(&tid_produce2);
    thread_init(&tid_produce3);
    thread_init(&tid_consume1);
    thread_init(&tid_consume2);

    thread_create(&tid_consume1, NULL, consume);
    thread_create(&tid_consume2, NULL, consume);
    thread_create(&tid_produce1, NULL, produce);
    thread_create(&tid_produce2, NULL, produce);
    thread_create(&tid_produce3, NULL, produce_tryP);

    thread_start(&tid_consume1, NULL);
    thread_start(&tid_consume2, NULL);
    thread_start(&tid_produce1, NULL);
    thread_start(&tid_produce2, NULL);
    thread_start(&tid_produce3, NULL);

    thread_join(&tid_consume1, NULL);
    thread_join(&tid_consume2, NULL);

    thread_join(&tid_produce1, NULL);
    thread_join(&tid_produce2, NULL);
    thread_join(&tid_produce3, NULL);


    thread_destroy(&tid_consume1);
    thread_destroy(&tid_consume2);
    thread_destroy(&tid_produce1);
    thread_destroy(&tid_produce2);
    thread_destroy(&tid_produce3);


    msg_sem_destroy(&shared.writer_mutex);
    msg_sem_destroy(&shared.reader_mutex);
    msg_sem_destroy(&shared.nempty);
    msg_sem_destroy(&shared.nfull);

    exit(0);
}



void *produce(void *arg)
{
    while( 1 )
    {
        msg_sem_p(&shared.nempty);	/* wait for at least 1 empty slot */

        msg_sem_p(&shared.writer_mutex);

        shared.buff[writer_index].n =
                read(fd, shared.buff[writer_index].data, BUFFSIZE);

        if( shared.buff[writer_index].n == 0 )
        {
            msg_sem_v(&shared.nfull);
            msg_sem_v(&shared.writer_mutex);
            return NULL;
        }

        writer_index = (writer_index+1)%NBUFF;

        msg_sem_v(&shared.nfull);
        msg_sem_v(&shared.writer_mutex);
    }

    return NULL;
}


void* produce_tryP(void *arg)
{
    int status;
    while( 1 )
    {
        /* wait for at least 1 empty slot */
        while( 1 )
        {
            status = msg_sem_tryP(&shared.nempty);
            if( status == 0 )
                break;
            else if( status == EAGAIN )
            {
                usleep(10*1000); //sleep 10 毫秒
                continue;
            }
            else
                return NULL;
        }

        msg_sem_p(&shared.writer_mutex);

        shared.buff[writer_index].n =
                read(fd, shared.buff[writer_index].data, BUFFSIZE);

        if( shared.buff[writer_index].n == 0 )
        {
            msg_sem_v(&shared.nfull);
            msg_sem_v(&shared.writer_mutex);
            return NULL;
        }

        writer_index = (writer_index+1)%NBUFF;

        msg_sem_v(&shared.nfull);
        msg_sem_v(&shared.writer_mutex);
    }

    return NULL;
}



void* consume(void *arg)
{
    while( 1 )
    {
        msg_sem_p(&shared.nfull);
        msg_sem_p(&shared.reader_mutex);

        if( shared.buff[reader_index].n == 0)
        {
            msg_sem_v(&shared.nempty);
            msg_sem_v(&shared.reader_mutex);
            return NULL;
        }

        write(STDOUT_FILENO, shared.buff[reader_index].data,
                shared.buff[reader_index].n);

        reader_index = (reader_index+1)%NBUFF;

        msg_sem_v(&shared.nempty);
        msg_sem_v(&shared.reader_mutex);
    }

    return NULL;
}

        測試結果:



相關推薦

實現訊號() 訊息佇列實現訊號

        前一篇使用管道實現訊號量,本文使用訊息佇列實現訊號量。其原理和管道一樣,都是通過在訊息佇列裡面寫入一個字元,讀取一個字元。這裡就不再多說了,直接上程式碼。         msg_sem.hpp 檔案 #ifndef MSG_SEM_HPP #define

Linux:程序間通訊(匿名管道命名管道)(共享記憶體,訊息佇列訊號

目錄 程序間通訊的介紹 管道 匿名管道 原理: 程式碼實現 匿名管道特性 實現管道符 |  命名管道 命名管道特性 程式碼實現 管道讀寫規則 作業系統中ipc的相關命令 共享記憶體(重點) 生命週期: 程式碼實現 程式碼實現獲

Spring Boot中使用WebSocket總結():使用訊息佇列實現分散式WebSocket

在上一篇文章(www.zifangsky.cn/1359.html)中我介紹了服務端如何給指定使用者的客戶端傳送訊息,並如何處理對方不線上的情況。在這篇文章中我們繼續思考另外一個重要的問題,那就是:如果我們的專案是分散式環境,登入的使用者被Nginx的反向代理分配到多個不同伺服器,那麼在其中一個伺服器建立了W

Linux 學習筆記—程序通訊之 訊息佇列訊號、共享記憶體的概念區別聯絡

2.5 訊息佇列(Message queues) 訊息佇列是核心地址空間中的內部連結串列,通過linux核心在各個程序直接傳遞內容,訊息順序地傳送到訊息佇列中,並以幾種不同的方式從佇列中獲得,每個訊息佇列可以用IPC識別符號唯一地進行識別。核心中的訊息佇列是通過

【Linux】程序間通訊之訊息佇列訊號和共享儲存

訊息佇列、訊號量和共享儲存是IPC(程序間通訊)的三種形式,它們功能不同,但有相似之處,下面先介紹它們的相似點,然後再逐一說明。 1、相似點 每個核心中的IPC結構(訊息佇列、訊號量和共享儲存)都用一個非負整數的識別符號加以引用,與檔案描述符不同,當一個

使用訊息佇列實現分散式事務-公認較為理想的分散式事務解決方案(

  Begin transaction update A set amount=amount-10000 where userId=1; update B set amount=amount+10000 where userId=1; End t

Linux程序間通訊--訊號,管道,訊息佇列訊號,共享記憶體,socket

Linux 傳統的程序間通訊有很多,如各類管道、訊息佇列、記憶體共享、訊號量等等。但它們都無法介於核心態與使用者態使用,原因如表 通訊方法 無法介於核心態與使用者態的原因 管道(不包括命名管道) 侷限於父子程序間的通訊。 訊息佇列 在硬、軟中斷中無法無阻塞地接收資料。 訊號量 無法介於核

IPC通訊之訊息佇列訊號和共享記憶體

    有三種IPC我們稱作XSI IPC,即訊息佇列,訊號量以及共享儲存器。XSI IPC源自System V的IPC功能。由於XSI IPC不使用檔案系統的名稱空間,而是構造了它們自己的名字空間,

共享記憶體、訊息佇列訊號之ipcs命令詳解

中介軟體中我們常通過啟動多個程序來提高其執行的穩定性,而共享記憶體、訊息佇列、訊號量等技術保證了多程序間的通訊。 在Linux系統中通過自帶的ipcs命令工具,可檢視當前系統中以上三項的使用情況,從而利於定位多程序通訊中出現的通訊問題。 ipcs -h檢視該命令的使用幫助

Linux下程序間通訊方式之管道、訊號、共享記憶體、訊息佇列訊號、套接字

/* 1,程序間通訊 (IPC ) Inter-Process Communication   比較好理解概念的就是程序間通訊就是在不同程序之間傳播或交換資訊。 2,linux下IPC機制的分類:管道、訊號、共享記憶體、訊息佇列、訊號量、套接字 3,這篇主要說說管

對Linux中訊息佇列訊號集合的理解

訊息佇列和訊號量集合同樣作為程序間通訊的重要手段,是LInux程式設計必需理解的內容,但兩者類似的操作和檔案結構讓很多人不能理解其中的原理。下面我來介紹下我的理解: 在使用訊息佇列和訊號量集合前都必須使用的一個函式Key_t ftok(char *pathname,char

[轉載]使用訊息佇列實現分散式事務-公認較為理想的分散式事務解決方案

前陣子從支付寶轉賬1萬塊錢到餘額寶,這是日常生活的一件普通小事,但作為網際網路研發人員的職業病,我就思考支付寶扣除1萬之後,如果系統掛掉怎麼辦,這時餘額寶賬戶並沒有增加1萬,資料就會出現不一致狀況了。 上述場景在各個型別的系統中都能找到相似影子,比如在電商系統中,當有使用者下單後,除了在訂單表插

System V訊息佇列實現的檔案伺服器(不跨網路)

可能是定時的部分有問題吧,導致客戶端無法接收資料,不過我感覺思想是沒錯的。。。先pull上吧,以後發現錯誤再改 參考資料:UNP卷二 message.h #ifndef _MESSAGE_H #define _MESSAGE_H #include<stdio.h> #i

利用訊息佇列實現簡單聊天程式

本篇利用訊息佇列的特性實現簡單的聊天程式,msgsnd傳送資料,msgrcv接收資料來實現聊天功能,訊息佇列詳情。 資料接收端msgrcv //這是一個以system V訊息佇列實現的聊天程式客戶端 //// 1.建立訊息佇列 //// 2.從訊息佇列中獲取一個數據,打印出來 ///

PHP訊息佇列實現及應用:訊息佇列概念介紹

  在網際網路專案開發者經常會遇到『給使用者群發簡訊』、『訂單系統有大量的日誌需要記錄』或者在秒殺業務的時候伺服器無法承受瞬間併發的壓力。  這種情況下,我們怎麼保證系統正常有效的執行呢? 這個時候,我們可以引入一個叫『訊息佇列』的概念來解決上面的需求。 訊息佇列的概

用 Redis 實現 PHP 的簡單訊息佇列

參考:PHP高階程式設計之訊息佇列 訊息佇列就是在訊息的傳輸過程中,可以儲存訊息的容器。 常見用途: 儲存轉發:非同步處理耗時的任務 分散式事務:多個消費者消費同一個訊息佇列 應對高併發:通過訊息佇列儲存任務,慢慢處理 釋出訂閱:實現解耦

基於Apache Apollo 的MQTT訊息佇列實現

1.Apache Apollo 的介紹和服務搭建 (以 windows為例)          1.1 介紹           MQTT是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支援所有平臺,幾乎可以把所有聯網物品和外部連線起來,被用來當做

Redis + DB +訊息佇列 實現高效的文章點贊,點踩功能

需求說明 使用者可點贊或踩,每贊一次,“贊”數量+1,每踩一次,“踩”數量+1,“點贊”和“點踩” 當天內二選一當天內有效 場景:使用者A 點贊 文章a,文章a 點贊量+1 ,同一使用者,同一文章 當天再次點選無效,贊與踩二選一,隔天再次點選有效 表設計 文章的

介紹下用訊息佇列實現分散式事務

    在OIE的時代, 上層應用開發人員總是認為資料庫足夠強大, 所以很多業務可以做的非常簡單。 比如A轉賬50元給B這個過程, 只要寫一個簡單sql語句塊就ok了。         開始事務;     A賬戶減去50     B賬戶增加50     提交事務。     

SpringBoot(9) 基於Redis訊息佇列實現非同步操作

什麼是訊息佇列?所謂訊息佇列,就是一個以佇列資料結構為基礎的一個真實存在的實體,如陣列,redis中的佇列集合等等,都可以。為什麼要使用佇列?主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MyS