1. 程式人生 > >一個簡單的訊息佇列類(封裝system V訊息佇列)

一個簡單的訊息佇列類(封裝system V訊息佇列)

基本的思想是,

1、將訊息佇列封裝成一個類,可以簡單地控制訊息佇列的建立、收發和刪除。

2、更改訊息協議時,不需要修改收發的類。

以下是程式碼,使用模板來達到上述第二點的要求,只要按規則定義了訊息結構,則可以複用此訊息佇列的程式碼。因為使用了模板,所以使用了hpp標頭檔案,將類和其成員函式都定義在hpp檔案裡,使用時只需要包含此標頭檔案就可以了。

//DMMsgQueue.hpp
#ifndef _DMMSGQUEUE_HPP_
#define _DMMSGQUEUE_HPP_
#include <stdio.h>
#include <sys/msg.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <errno.h>
#include <stdlib.h>
#include <iostream>
#ifdef __LINUX__
const int SVMSG_MODE = 0660;
#else
#include <sys/ipc.h>
const int SVMSG_MODE = MSG_R | MSG_W;
#endif
const int FIND_OR_CREATE = SVMSG_MODE | IPC_CREAT;
const int CREATE = SVMSG_MODE | IPC_CREAT | IPC_EXCL;

/**
 * @brief Structure of message sample, every structure of message must contain
 * the member mtype and member funtion MsgSize just like this sample struct and
 * you can add more members to fit your need
 */
struct DMBaseMsg
{
  long mtype;
  size_t MsgSize() const
  {
    return 0;
  }
};

/**
 * @brief Message queue base on system V message queue 
 */
template <typename T>
class DMMsgQueue
{
 public:
  DMMsgQueue(std::string strQueueName, int flag = FIND_OR_CREATE);
  ~DMMsgQueue(){};
  bool Recv(T& msgObject, int flag = 0);
  bool Send(const T& msgObject, int flag = 0);
  bool RecvTimeOut(T& msgObject, int nTimeout = 5);
  bool SendTimeOut(const T& msgObject, int nTimeout = 5);
  bool Remove();
 private:
  int m_Msqid;
};

template <typename T>
DMMsgQueue<T>::DMMsgQueue(std::string strQueueName, int flag)
{
  std::string strFile= "/tmp/";
  strFile += strQueueName;
  std::string strCmd = "touch ";
  strCmd += strFile;
  system(strCmd.c_str());
  key_t mqkey = ftok(strFile.c_str(), 42);
  if(mqkey == -1)
  {
    throw TCException("DMMsgQueue create error!Ftok error!");
  }
  m_Msqid = msgget(mqkey, flag);
  if(errno = EEXIST && flag == CREATE)
  {
    throw TCException("DMMsgQueue create error!MsgQueue Existed!");
  }

  if(m_Msqid == -1)
  {
    PRINTTRACE(g_szError, "[DMMsgQueue::DMMsgQueue]Error! errno:%d\n", errno);
    throw TCException("DMMsgQueue create error!");
  }
}

template <typename T>
bool DMMsgQueue<T>::Send(const T& msgObject, int flag)
{
  if(0 == msgsnd(m_Msqid, (void*)&msgObject, msgObject.MsgSize(),flag))
  {
    return true;
  }
  else
  {
    return false;
  }
}

template <typename T>
bool DMMsgQueue<T>::Recv(T& msgObject, int flag)
{
  if(0 < msgrcv(m_Msqid, (void*)&msgObject, msgObject.MsgSize(),msgObject.mtype, flag))
  {
    return true;
  }
  else
  {
    return false;
  }
}

template <typename T>
bool DMMsgQueue<T>::RecvTimeOut(T& msgObject, int nTimeout)
{
  time_t starttime = time(NULL);
  while(true)
  {
    int nReturn = 0;
    nReturn = msgrcv(m_Msqid, (void*)&msgObject, msgObject.MsgSize(), msgObject.mtype, IPC_NOWAIT);
    if(0 < nReturn)
    {
      return true;
    }
    else
    {
      if(errno == ENOMSG || 0 == nReturn)
      {
        if(time(NULL) - starttime < nTimeout)
        {
          usleep(50);
          continue;
        }
      }

      return false;
    }
  }
}

template <typename T>
bool DMMsgQueue<T>::SendTimeOut(const T& msgObject, int nTimeout)
{
  time_t starttime = time(NULL);
  while(true)
  {
    int nReturn = 0;
    nReturn = msgsnd(m_Msqid, (void*)&msgObject, msgObject.MsgSize(), IPC_NOWAIT);
    if(0 == nReturn)
    {
      return true;
    }
    else
    {
      if(errno == EAGAIN)
      {
        if(time(NULL) - starttime < nTimeout)
        {
          usleep(50);
          continue;
        }
      }

      return false;
    }
  }
}

/**
 * @brief If you want to remove a message queue you have created , try this
 *
 * @return 
 */
template <typename T>
bool DMMsgQueue<T>::Remove()
{
  if(!msgctl(m_Msqid, IPC_RMID, NULL))
  {
    return false;
  }

  return true;
}
#endif

這裡需要注意一點,編譯時必須定義巨集__LINUX__,這裡主要出於我工作時移植性的考慮,某些巨集在AIX上有定義,而linux中沒有。另外,建構函式中所使用的ftok函式的注意事項,可以參照我之前寫的另一個文章。

以下再舉一個測試函式例子:

#include "DMMsgQueue.hpp"

int main(int argc, char **argv)
{
  DMBaseMsg msg;
  msg.mtype = 100;
  memset(msg.mmsg, 0x00, 1024);
  sprintf(msg.mmsg, "abc");
  DMMsgQueue<DMBaseMsg> my_mq("my_mq");
  std::cout << "Sending..." << std::endl;
  if(!my_mq.Send(msg))
  {
    std::cout << "send error .." << std::endl;
    my_mq.Remove();
    return -1;
  }
  DMBaseMsg msg2;
  msg2.mtype = 200;
  std::cout << "Receiving..." << std::endl;
  if(!my_mq.RecvTimeOut(msg2, 10))
  {
    std::cout << "Receive error" << std::endl;
    my_mq.Remove();
    return -1;
  }
  std::cout << msg2.mmsg<< std::endl;

  my_mq.Remove();
  return 0;
}