1. 程式人生 > >以佇列的形式使用共享記憶體

以佇列的形式使用共享記憶體

共享記憶體允許多個程序使用某一段儲存區,資料不需要在程序之間複製,多個程序都把該記憶體區域對映到自己的虛擬地址空間,直接訪問該共享記憶體區域,從而可以通過該區域進行通訊,是一種速度很快IPC。

下面是共享記憶體對映圖

共享記憶體對映

一般描述使用共享記憶體,都是以普通緩衝區的形式訪問,這裡除了講述共享記憶體的原理和使用方法,還會實現用佇列的方式使用共享記憶體。

建立共享記憶體

int shmget(key_t key, size_t size, int shmflg)

利用shmget函式建立共享記憶體

第一個引數key用於表示共享記憶體的識別符號,函式會利用這個識別符號生成一個ID,表示建立的共享記憶體,通常用ftok函式來產生這個key:

key_t ftok( const char * fname, int id )

fname是一個現存的檔案,id是子序號(只取低8位,不能為0),ftok方法會通過fname檔案所在檔案系統的資訊,索引節點號,以及id組合成一個ID並且返回
第二個引數表示要建立的共享記憶體大小
第三個引數是許可權標誌,一般用IPC_CREAT,若共享記憶體不存在則建立,如果想要其他選項,可以使用或操作加上

成功呼叫後shmget會返回表示共享記憶體的ID,接下來利用這個ID把共享記憶體對映到程序的地址空間

void *shmat(int shm_id, const void *shm_addr, int
shmflg)

第一個引數,shm_id是由shmget函式返回的共享記憶體標識。
第二個引數,shm_addr指定共享記憶體連線到當前程序中的地址位置,通常為0,表示讓系統來選擇共享記憶體的地址。
第三個引數,shm_flg是一組標誌位,通常為0

呼叫成功時返回一個指向共享記憶體第一個位元組的指標

銷燬共享記憶體

當使用完共享記憶體,要進行銷燬操作

int shmdt(const void *shmaddr)

這個函式只是把共享記憶體到本程序地址空間的對映解除,引數就是呼叫shmat時的返回值(指向共享記憶體的指標)

接著才是真正刪除共享記憶體

int shmctl
(int shm_id, int command, struct shmid_ds *buf)

第一個引數是共享記憶體的識別符號(即shmget的返回值)
第二個引數是命令,如果要刪除共享記憶體,一般使用IPC_RMID
第三個引數是共享記憶體管理結構體,刪除時一般為0

用佇列的方式使用共享記憶體

實際在專案裡使用共享記憶體,簡單的緩衝區形式往往無法滿足需求。
比如:要求一個程序不斷往共享記憶體寫入資料,而另一個程序從共享記憶體讀出資料,如果一定要讀程序讀取資料後,寫程序才能寫入新的資料,在效能上無疑是一個缺陷,如果寫程序不理會資料是否被讀程序獲取而不斷地寫入新資料,那麼原來的資料將被覆蓋。

我們很容易想到的是利用佇列實現上述要求,下面將講述如何用佇列的方式使用共享記憶體

首先,我們要定義資料結構:

#define MAX_NODE_DATA_SIZE  65535
#define MAX_QUEUE_NODE_COUNT    127

typedef struct buffer_node_{
    uint16_t dataLen;
    uint8_t data[MAX_NODE_DATA_SIZE];
} buffer_node_t;

typedef struct buffer_queue_{
    buffer_node_t queue[MAX_QUEUE_NODE_COUNT];
    int front;
    int rear;
    bool Init(){
        front = rear = 0;
        memset(queue, sizeof(buffer_node_t)*MAX_QUEUE_NODE_COUNT, 0);
    }
    bool isEmpty(){
        return (rear == front);
    }
    bool isFull(){
        return ((rear+1)%MAX_QUEUE_NODE_COUNT == front);
    }
    bool Enqueue(buffer_node_t *node){
        if(isFull()){
            return false;
        }
        memcpy(&queue[rear], node, sizeof(buffer_node_t));
        rear = (rear+1)%MAX_QUEUE_NODE_COUNT;
        return true;
    }
    buffer_node_t* Dequeue(){
        if(isEmpty()){
            printf("%d\n",__LINE__);
            return NULL;
        }
        buffer_node_t* node = &queue[front];
        front = (front+1)%MAX_QUEUE_NODE_COUNT;
        return node;
    }
}buffer_queue_t;

接下來建立我們需要的共享記憶體:

buffer_manage_t *bufferManage = NULL;
int bufferID = 0;

key_t k;
bufferID = shmget(k = ftok("./tsm3", 1), sizeof(buffer_manage_t), IPC_CREAT);

bufferManage = (buffer_manage_t*)shmat(bufferID, 0, 0);

如果上面建立共享記憶體這一步成功,基本也就沒有大問題了,其實所謂用佇列的方式使用共享記憶體,說白了就是把共享記憶體轉換為我們自己定義的資料結構,這個和malloc其實很像,當利用malloc申請記憶體時,返回的是void*型別指標,程式設計師根據需要可以把指標轉換成自己需要使用的型別。

我們把申請的共享記憶體首地址賦值給bufferManage 後,就可以利用結構體裡的方法操作佇列,這就是把佇列的操作方法定義在結構體裡的原因了

理解了這點之後,就可以很容易地用佇列的方式使用共享記憶體

#include <sys/ipc.h>
#include <sys/shm.h>
#include <errno.h>
#include <string.h>
#include <vector>
#include <stdio.h>
#include <string.h>
#include <malloc.h>
#include <unistd.h>
using namespace std;

#define MAX_NODE_DATA_SIZE      65535
#define MAX_QUEUE_NODE_COUNT    32

typedef unsigned char uint8_t;
typedef unsigned short uint16_t;


typedef struct buffer_node_{
    uint16_t dataLen;
    uint8_t data[MAX_NODE_DATA_SIZE];
} buffer_node_t;

typedef struct buffer_queue_{
    buffer_node_t queue[MAX_QUEUE_NODE_COUNT];
    int front;
    int rear;
    bool Init(){
        front = rear = 0;
        memset(queue, sizeof(buffer_node_t)*MAX_QUEUE_NODE_COUNT, 0);
    }
    bool isEmpty(){
        return (rear == front);
    }
    bool isFull(){
        return ((rear+1)%MAX_QUEUE_NODE_COUNT == front);
    }
    bool Enqueue(buffer_node_t *node){
        if(isFull()){
            return false;
        }
        memcpy(&queue[rear], node, sizeof(buffer_node_t));
        rear = (rear+1)%MAX_QUEUE_NODE_COUNT;
        return true;
    }
    buffer_node_t* Dequeue(){
        if(isEmpty()){
            return NULL;
        }
        buffer_node_t* node = &queue[front];
        front = (front+1)%MAX_QUEUE_NODE_COUNT;
        return node;
    }
}buffer_queue_t;

typedef struct buffer_manage_{
    buffer_queue_t sendQueue;
    buffer_queue_t receiveQueue;
    void buffer_init_(){
        sendQueue.Init();
        receiveQueue.Init();
    }
} buffer_manage_t;

buffer_manage_t *bufferManage = NULL;
int bufferID = 0;

bool Create_ShareMem(){
    key_t k;
    printf("size is %d\n",sizeof(buffer_manage_t));
    bufferID = shmget(k = ftok("./tsm3", 1), sizeof(buffer_manage_t), IPC_CREAT);
    printf("k is %d\n",k);
    if(bufferID == -1){
        printf("Create_ShareMem shmget failed errno = %d, error msg is %s\n", errno,strerror(errno));
    }
    printf("Create_ShareMem shmget bufferID = %d, error = %d\n", bufferID, errno);

    bufferManage = (buffer_manage_t*)shmat(bufferID, 0, 0);
    printf("Create_ShareMem shmget bufferManage = %p, error = %d\n", bufferManage, errno);

    if(bufferManage == (void *) -1 || bufferManage == NULL){
        bufferManage = NULL;
        printf("Create_ShareMem shmat failed errno = %d", errno);
        return false;
    }
    bufferManage->buffer_init_();
    return true;
}

bool Delete_ShareMem(){
    if((shmdt(bufferManage) != 0)){
        printf("Delete_ShareMem shmdt failed errno = %d", errno);
    }
    if(shmctl(bufferID, IPC_RMID, 0) != 0){
        printf("Delete_ShareMem shmctl failed errno = %d", errno);
        return false;
    }
    return true;
}

bool ShareMem_AddData(uint8_t* data, uint16_t dataLen)
{
    bool ret = false;
    buffer_node_t* node;
    node = (buffer_node_t*)malloc(sizeof(buffer_node_t));
    memset(node, 0, sizeof(buffer_node_t));
    node->dataLen = dataLen;
    if(node->dataLen > MAX_NODE_DATA_SIZE)
        node->dataLen = MAX_NODE_DATA_SIZE;
    if(data != NULL){
        memcpy(node->data, data, node->dataLen);
    }
    if(bufferManage != NULL){
        bufferManage->receiveQueue.Enqueue(node);
        ret = true;
    }
    return ret;
}

bool ShareMem_RemoveData(uint8_t** data, uint16_t* dataLen)
{
    bool ret = false;
    if(bufferManage != NULL){
        buffer_node_t* node = NULL;
        //bufferManage->sendQueueProtected.Lock();
        node = bufferManage->sendQueue.Dequeue();
        if(node != NULL){
            *data = (uint8_t*)malloc(node->dataLen);
            if(*data != NULL){
                memcpy(*data, node->data, node->dataLen);
                *dataLen = node->dataLen;
                ret = true;
            }
        }
        //bufferManage->sendQueueProtected.Unlock();
    }
    return ret;
}

void printData()
{
    for(int i = bufferManage->receiveQueue.front; i < bufferManage->receiveQueue.rear; i++)
    {
        printf("idx is %d,data is %s\n",i,bufferManage->receiveQueue.queue[i].data);
    } 
}

int main()
{
    Create_ShareMem();
    while(1){
        sleep(2);
        uint8_t fromT1[10] = "123456789"; 
        ShareMem_AddData(1, fromT1, 10);
        printData();
        sleep(1);
        //ShareMem_RemoveData();    
    }   

    Delete_ShareMem();
    return 0;
}

另一個讀程序的程式碼差不多,只要改一下main函式即可