1. 程式人生 > >基於libevent的tcp拆包分包庫

基於libevent的tcp拆包分包庫

緩沖 sin htons tail 基於 list mil 內容 emc

TCP/IP協議雖然方便,但是由於是基於流的傳輸(UDP是基於數據報的傳輸),無論什麽項目,總少不了解決拆包分包問題。

以前的項目總是每個程序員自己寫一套拆包分包邏輯,實現的方法與穩定性都不太一致。終於有了做基線的機會,自己寫了一個基於libevent的拆包分包庫。

本文檔黏貼一些核心的內容。

//回調接口

class ITcpPacketNotify
{
public:
virtual void OnConnected(int fd) = 0;
virtual void OnDisConnected(int fd) = 0;
virtual void OnTimeOutError(int fd) = 0;
virtual void OnBuffOverFlow(int fd) = 0;
//提取包,需要業務層返回解析出包的長度,或者舍棄一些不合格的包,成功解析出包返回true
virtual bool OnAnalyzePacket(int fd,const char* buff,int bufflen,int& packetlen,int &ignore) = 0;
//業務層處理包回調,如果需要返回包,可以直接在respond參數和respondlen參數返回,長度不得超過40960
virtual void OnPacketArrived(int fd,const char* packet,int packetlen,char* respond,int& respondlen) = 0;
};

提供了兩種錯誤情況的通知,跟別的庫不太一樣的地方是,需要業務層實現拆包邏輯,畢竟每個項目的協議不一樣。然後就會收到packet的通知。

//提取包,需要業務層返回解析出包的長度,或者舍棄一些不合格的包,成功解析出包返回true
//1B 1B 2B 4B NB 2B 2B
//幀頭 命令字 幀序號 幀長度 幀數據 校驗字 幀尾
// HEAD CMD FRAME_SEQ DATA_LEN DATA CRC TAIL

virtual bool OnAnalyzePacket(int fd, const char* buff, int bufflen,int& packetlen, int &ignore)
{
if (bufflen<=12)
{
return false;
}
if (buff[0]!=(char)‘!‘)
{
for (int i=1;i<bufflen;i++)
{
if (buff[i] == (char)‘!‘)
{
ignore = i;
break;
}
}
return false;
}

//數據長度
int length = 0;
memcpy((void*)&length, (void*)&buff[4], 4);
length = ntohl(length);
if (bufflen<length+12)
{
return false;
}

packetlen = length + 12;
return true;
}

上面是某種協議的拆包例子。

typedef struct _TcpPacketConfig
{
int _port; //服務器的端口號
short _workNum; //工作的線程數目
unsigned int _connNum; //每個工作線程的連接數
int _readTimeOut; //讀取的超時時間
int _writeTimeOut; //寫入的超時時間

_TcpPacketConfig()
{
_connNum = 100;
_workNum = 5;
_readTimeOut = 120;
_writeTimeOut = 120;
_port = 8000;
}

} TcpPacketConfig;

class ITcpPacketManager
{
public:
virtual bool Start(_TcpPacketConfig & config, ITcpPacketNotify* notify) = 0;
virtual void Stop() = 0;
virtual bool SendPacket(int fd,const char* packet,int packetlen) = 0;
};

TCPPACKET_API ITcpPacketManager* CreateTcpPacketManager();

TCPPACKET_API void DestroyTcpPacketManager(ITcpPacketManager* manager);

對外的接口方法。

bool CTcpPacketImp::Start(_TcpPacketConfig & config, ITcpPacketNotify* notify)
{
return m_libEvent.StartServer(config._port, config._workNum, config._connNum, config._readTimeOut, config._writeTimeOut,notify);
}

void CTcpPacketImp::Stop()
{
return m_libEvent.StopServer();
}

bool CTcpPacketImp::SendPacket(int fd, const char* packet, int packetlen)
{
return m_libEvent.SendPacket(fd,packet,packetlen);
}

轉移到m_libEvent實現。

最核心的功能代碼如下。

一些數據定義:

#include <event2/bufferevent.h>
#include <event2/bufferevent_compat.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/buffer_compat.h>
#include <event2/http_struct.h>
#include <event2/bufferevent.h>
#include <event2/thread.h>

struct _Conn;
struct _Worker;

//服務器屬性封裝對象
struct _Server
{
bool bStart;
short nPort;
short workernum;
unsigned int connnum;
volatile int nCurrentWorker;
int read_timeout;
int write_timeout;
struct evconnlistener *pListener;
struct event_base *pBase;
HANDLE hThread;
_Worker *pWorker;
};
//連接對象列表
struct _ConnList
{
_ConnList()
{
head=NULL;
tail=NULL;
plistConn=NULL;
}
_Conn *head;
_Conn *tail;
_Conn *plistConn;
};
//連接對象
struct _Conn
{
_Conn()
{
fd=NULL;
bufev=NULL;
index=-1;
in_buf_len=0;
out_buf_len=0;
owner=NULL;
next=NULL;
in_buf=new char[emMaxBuffLen];
out_buf=new char[emMaxBuffLen];
}
~_Conn()
{
delete[]in_buf;
delete[]out_buf;
bufferevent_free(bufev);
}
struct bufferevent *bufev;
evutil_socket_t fd;
int index;
char *in_buf;
int in_buf_len;
char *out_buf;
int out_buf_len;
_Worker *owner;
_Conn *next;
};
//工作線程封裝對象.
struct _Worker
{
_Worker()
{
pWokerbase=NULL;
hThread=INVALID_HANDLE_VALUE;
pListConn=NULL;
}
struct event_base *pWokerbase;
HANDLE hThread;
_ConnList *pListConn;
inline _Conn* GetFreeConn()
{
_Conn*pItem=NULL;
if(pListConn->head!=pListConn->tail)
{
pItem=pListConn->head;
pListConn->head=pListConn->head->next;
}
return pItem;
}
inline void PutFreeConn(_Conn *pItem)
{
pListConn->tail->next=pItem;
pListConn->tail=pItem;
}
};

typedef struct _Server Server;
typedef struct _Worker Worker;
typedef struct _Conn Conn;
typedef struct _ConnList ConnList;

頭文件:

class CLibEvent
{
public:
CLibEvent(void);
~CLibEvent(void);
private:
//當前服務器對象
Server m_Server;
public:
bool StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout,ITcpPacketNotify* notify);
void StopServer();
bool SendPacket(int fd, const char* packet, int packetlen);
private:
static void DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data);
static void DoError(struct bufferevent *bev, short error, void *ctx);
static void CloseConn(Conn *pConn);
static void DoRead(struct bufferevent *bev, void *ctx);
static DWORD WINAPI ThreadServer(LPVOID lPVOID);
static DWORD WINAPI ThreadWorkers(LPVOID lPVOID);

static ITcpPacketNotify * m_notify;
};

cpp:

#include "StdAfx.h"
#include "LibEvent.h"

#include <string>
#include <iostream>
using namespace std;

#include <assert.h>
#include <signal.h>

#include <WinSock2.h>


CLibEvent::CLibEvent(void)
{
ZeroMemory(&m_Server,sizeof(m_Server));
WSADATA WSAData;
WSAStartup(0x0201, &WSAData);
}

CLibEvent::~CLibEvent(void)
{
WSACleanup();
}

bool CLibEvent::StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout,ITcpPacketNotify* notify)
{
m_notify = notify;
m_Server.bStart=false;
m_Server.nCurrentWorker=0;
m_Server.nPort=port;
m_Server.workernum=workernum;
m_Server.connnum=connnum;
m_Server.read_timeout=read_timeout;
m_Server.write_timeout=write_timeout;
evthread_use_windows_threads();
m_Server.pBase=event_base_new();
if (m_Server.pBase==NULL)
{
return false;
}
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(m_Server.nPort);
m_Server.pListener=evconnlistener_new_bind(m_Server.pBase,DoAccept,(void*)&m_Server,LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,-1,(struct sockaddr*)&sin,sizeof(sin));
if (m_Server.pListener==NULL)
{
return false;
}

m_Server.pWorker=new Worker[workernum];
for (int i=0;i<workernum;i++)
{
m_Server.pWorker[i].pWokerbase=event_base_new();
if (m_Server.pWorker[i].pWokerbase== NULL)
{
delete []m_Server.pWorker;
return false;
}
//初始化連接對象
{
m_Server.pWorker[i].pListConn=new ConnList();
if (m_Server.pWorker[i].pListConn==NULL)
{
return false;
}
m_Server.pWorker[i].pListConn->plistConn=new Conn[m_Server.connnum+1];
m_Server.pWorker[i].pListConn->head=&m_Server.pWorker[i].pListConn->plistConn[0];
m_Server.pWorker[i].pListConn->tail=&m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum];
for (int j=0; j<m_Server.connnum; j++) {
m_Server.pWorker[i].pListConn->plistConn[j].index=j;
m_Server.pWorker[i].pListConn->plistConn[j].next=&m_Server.pWorker[i].pListConn->plistConn[j+1];
}
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].index=m_Server.connnum;
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].next=NULL;
//設置當前事件
Conn *p=m_Server.pWorker[i].pListConn->head;
while (p!=NULL)
{
p->bufev=bufferevent_socket_new(m_Server.pWorker[i].pWokerbase,-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
if (p->bufev==NULL)
{
return false;
}
bufferevent_setcb(p->bufev, DoRead, NULL, DoError, p);
bufferevent_setwatermark(p->bufev, EV_READ, 0, 0); //使用默認的高低水位
bufferevent_enable(p->bufev, EV_READ|EV_WRITE);
struct timeval delayWriteTimeout;
delayWriteTimeout.tv_sec=m_Server.write_timeout;
delayWriteTimeout.tv_usec=0;
struct timeval delayReadTimeout;
delayReadTimeout.tv_sec=m_Server.read_timeout;
delayReadTimeout.tv_usec=0;
//bufferevent_set_timeouts(p->bufev,&delayReadTimeout,&delayWriteTimeout);
p->owner=&m_Server.pWorker[i];
p=p->next;
}
}
m_Server.pWorker[i].hThread=CreateThread(NULL,0,ThreadWorkers,&m_Server.pWorker[i],0,NULL);
}
m_Server.hThread=CreateThread(NULL,0,ThreadServer,&m_Server,0,NULL);
if (m_Server.hThread==NULL)
{
return false;
}
m_Server.bStart=true;
return true;
}

void CLibEvent::StopServer()
{
if (m_Server.bStart)
{
struct timeval delay = { 2, 0 };
event_base_loopexit(m_Server.pBase, &delay);
WaitForSingleObject(m_Server.hThread,INFINITE);
if (m_Server.pWorker)
{
for (int i=0;i<m_Server.workernum;i++)
{
event_base_loopexit(m_Server.pWorker[i].pWokerbase, &delay);
WaitForSingleObject(m_Server.pWorker[i].hThread,INFINITE);
}
for (int i=0;i<m_Server.workernum;i++)
{
if (m_Server.pWorker[i].pListConn)
{
delete []m_Server.pWorker[i].pListConn->plistConn;
delete m_Server.pWorker[i].pListConn;
m_Server.pWorker[i].pListConn=NULL;
}
event_base_free(m_Server.pWorker[i].pWokerbase);
}
delete[]m_Server.pWorker;
m_Server.pWorker=NULL;
}
evconnlistener_free(m_Server.pListener);
event_base_free(m_Server.pBase);
}
m_Server.bStart=false;
}

void CLibEvent::DoRead(struct bufferevent *bev, void *ctx)
{
struct evbuffer * input=bufferevent_get_input(bev);
if (evbuffer_get_length(input))
{
Conn *c = (Conn*) ctx;
while (evbuffer_get_length(input))
{
//超過emMaxBuffLen還沒有被消費掉,無能為力了。
if (c->in_buf_len >= emMaxBuffLen)
{
m_notify->OnBuffOverFlow(c->fd);
CloseConn(c);
return;
}

//拷貝緩沖池的內存到Conn,最大緩沖不超過emMaxBuffLen
c->in_buf_len += evbuffer_remove(input, c->in_buf + c->in_buf_len, emMaxBuffLen - c->in_buf_len);
//拋給業務層去解析包
while (true)
{
int packlen = 0, ignore = 0;
bool bRet = m_notify->OnAnalyzePacket(c->fd, c->in_buf,c->in_buf_len,packlen, ignore);
if (!bRet) //可能要舍棄一些臟數據
{
if (ignore > 0)
{
c->in_buf_len -= ignore; //緩沖長度變少
memmove(c->in_buf, c->in_buf + ignore, c->in_buf_len);
}
else
{
//解析包失敗了,往往是長度不夠,跳出此循環繼續讀緩沖數據
break;
}
}
else
{
if (packlen>c->in_buf_len)
{
//用戶解析的時候未考慮長度
break;
}
//解析成功,通知業務層處理
m_notify->OnPacketArrived(c->fd, c->in_buf, packlen,c->out_buf,c->out_buf_len);
if (c->out_buf_len !=0)
{
//回復報文
struct evbuffer * output = bufferevent_get_output(bev);
evbuffer_add(output, c->out_buf, c->out_buf_len);
//移除數據
c->out_buf_len = 0;
}
//移除這個包文
c->in_buf_len -= packlen; //緩沖長度變少
memmove(c->in_buf, c->in_buf + packlen, c->in_buf_len);
}
}

}

}
}

void CLibEvent::CloseConn(Conn *pConn)
{
pConn->in_buf_len = 0;
m_notify->OnDisConnected(pConn->fd);
bufferevent_disable(pConn->bufev, EV_READ | EV_WRITE);
evutil_closesocket(pConn->fd);
pConn->owner->PutFreeConn(pConn);
}

void CLibEvent::DoError(struct bufferevent *bev, short error, void *ctx)
{
Conn *c=(Conn*)ctx;
if (error&EVBUFFER_TIMEOUT)
{
m_notify->OnTimeOutError(c->fd);
}else if (error&EVBUFFER_ERROR)
{
m_notify->OnBuffOverFlow(c->fd);
}
CloseConn(c);
}

void CLibEvent::DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data)
{
//此處為監聽線程的event.不做處理.
Server *pServer = (Server *)user_data;
//主線程處做任務分發.
int nCurrent=pServer->nCurrentWorker++%pServer->workernum;
//當前線程所在ID號
Worker &pWorker=pServer->pWorker[nCurrent];
//通知線程開始讀取數據,用於分配哪一個線程來處理此處的event事件
Conn *pConn=pWorker.GetFreeConn();
if (pConn==NULL)
{
return;
}
pConn->fd=fd;
evutil_make_socket_nonblocking(pConn->fd);
bufferevent_setfd(pConn->bufev, pConn->fd);
//轉發發送事件
m_notify->OnConnected(pConn->fd);
bufferevent_enable(pConn->bufev, EV_READ | EV_WRITE);
}

DWORD WINAPI CLibEvent::ThreadServer(LPVOID lPVOID)
{
Server * pServer=reinterpret_cast<Server *>(lPVOID);
if (pServer==NULL)
{
return -1;
}
event_base_dispatch(pServer->pBase);
return GetCurrentThreadId();
}

DWORD WINAPI CLibEvent::ThreadWorkers(LPVOID lPVOID)
{
Worker *pWorker=reinterpret_cast<Worker *>(lPVOID);
if (pWorker==NULL)
{
return -1;
}
event_base_dispatch(pWorker->pWokerbase);
return GetCurrentThreadId();
}

bool CLibEvent::SendPacket(int fd, const char* packet, int packetlen)
{
//這裏可能需要優化,但是有些文檔說底層會自動分幀發送
int nRet = send(fd, packet, packetlen, 0);
if (nRet<packetlen)
{
return false;
}

return true;
}

ITcpPacketNotify * CLibEvent::m_notify=NULL;

基於libevent的tcp拆包分包庫