windows下Libevent +多執行緒(負載均衡分配法) 之檔案傳輸
阿新 • • 發佈:2018-11-19
一、先說一下服務端的流程:
1、主執行緒負責監聽客戶端的連線;
2、當有客戶端連線時,主執行緒通過管道向相應的子執行緒傳送監聽套接字描述符,子執行緒通過負載均衡法選擇出來;
3、當主執行緒傳送監聽描述符時,子執行緒的讀管道回撥函式會被回撥;
4、子執行緒為收到的監聽描述符設定讀取回調、寫回調、事件回撥等回撥函式;
5、子執行緒通過開啟的事件迴圈,迴圈監聽第4步的事件,並回調相應的回撥函式。
二、客戶端傳送檔案、服務端接收檔案原則(與上一節的方法不同):
1、客戶端傳送檔案時採用讀取多少位元組就傳送多少位元組,程式中設定成10000位元組;
2、服務端接收多少位元組就寫檔案多少位元組
3、與上一節的方法相比,此方法對於大檔案也適用。
三、服務端程式碼,程式碼中有詳細的註釋:
// LibeventMulThreadBalanceDemo.cpp : 定義控制檯應用程式的入口點。 // #include "stdafx.h" #include "winsock2.h" #include <process.h> #include <io.h> #include <fcntl.h> #include "event2/listener.h" #include "event2/bufferevent.h" #include "event2/thread.h" #include "event.h" #include "BufferManager.h" #include "Windows.h" const int g_nThreadNum = 10; //開啟的執行緒數量 typedef struct Libevent_Thread { DWORD did; //子執行緒ID struct event_base *base; //子執行緒base struct event event; //子執行緒event evutil_socket_t read_fd; //讀管道描述符 evutil_socket_t write_fd; //寫管道描述符 } LIBEVENT_THREAD; typedef struct Dispatcher_Thread { DWORD did; //主執行緒ID struct event_base *base; //主執行緒base } DISPATCHER_THREAD; LIBEVENT_THREAD *g_pThreads = new LIBEVENT_THREAD[g_nThreadNum]; DISPATCHER_THREAD g_DispatcherThread; int g_nlastThread = 0; typedef struct PictureInfo { char szFileName[260]; long nFileSize; } PICTUREINFO; //讀緩衝去回撥函式 void read_cb(struct bufferevent *bev, void *arg) { BufferManager* bm = (BufferManager*)arg; //第一次接收 if (bm->nFileSize == 0) { int nReceived = bufferevent_read(bev, bm->buf,10000); bm->nReceiveTotal += nReceived; if (nReceived >= sizeof(PICTUREINFO)) { bm->nFileSize = ((PICTUREINFO*)bm->buf)->nFileSize; strcpy_s(bm->szImgName,sizeof(bm->szImgName),((PICTUREINFO*)bm->buf)->szFileName); bm->f = NULL; fopen_s(&bm->f,bm->szImgName,"wb"); if (fwrite(bm->buf+sizeof(PICTUREINFO),bm->nReceiveTotal - sizeof(PICTUREINFO),1,bm->f) < 1){ // write error } } } else if ((bm->nFileSize - bm->nReceiveTotal) >= 10000) { int nReceived = bufferevent_read(bev, bm->buf,10000); bm->nReceiveTotal += nReceived; if (fwrite(bm->buf,nReceived,1,bm->f) < 1){ // write error } } else if((bm->nFileSize - bm->nReceiveTotal) >= 0) { int nReceived = bufferevent_read(bev, bm->buf, bm->nFileSize-bm->nReceiveTotal); bm->nReceiveTotal += nReceived; if (fwrite(bm->buf,nReceived,1,bm->f) < 1){ // write error } } if (bm->nReceiveTotal == bm->nFileSize) { printf("收到的位元組數: %d ,nThreadID = %d\n",bm->nReceiveTotal,GetCurrentThreadId()); bm->iniParam(); } } //寫緩衝區回撥函式 void write_cb(struct bufferevent *bev, void *arg) { printf("成功寫資料給客戶端,寫緩衝區回撥函式被回撥.\n"); } //事件回撥函式 void event_cb(struct bufferevent *bev,short events, void *arg) { BufferManager* pThis = (BufferManager*)arg; if (events & BEV_EVENT_EOF) { printf("connection close.\n"); } else if(events & BEV_EVENT_ERROR) { printf("some other error.\n"); } //登出事件導致事件迴圈退出,這樣子執行緒也將退出 bufferevent_free(bev); if (pThis) { delete pThis; pThis = NULL; } printf("bufferevent 資源已經被釋放.\n"); } //開啟子執行緒的事件迴圈 unsigned __stdcall Work_Thread( void* pArguments ) { LIBEVENT_THREAD *pThis = (LIBEVENT_THREAD*)pArguments; event_base_dispatch(pThis->base); _endthreadex(0); return 1; } //管道讀回撥函式 void pipe_process(int fd, short which, void *arg) { LIBEVENT_THREAD* pThis = (LIBEVENT_THREAD*)arg; //獲取管道的讀取描述符 int readfd = pThis->read_fd; evutil_socket_t evsock; recv(readfd, (char*)&evsock, sizeof(evutil_socket_t), 0); //為新的連線關聯事件 struct bufferevent* bev; bev = bufferevent_socket_new(pThis->base, evsock, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); BufferManager *bm = new BufferManager; //給bufferevent緩衝區設定回撥 bufferevent_setcb(bev, read_cb, write_cb, event_cb, bm); //啟動bufferevent的讀緩衝區,讀緩衝區預設是disable的. bufferevent_enable(bev, EV_READ); } //主執行緒監聽器回撥函式 void cb_listener(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr *addr, int len, void *ptr) { printf("new client connect.\n"); //採用負載均衡演算法為當前連線選擇子執行緒 int nCurThread = g_nlastThread % g_nThreadNum; g_nlastThread = nCurThread + 1; int sendfd = g_pThreads[nCurThread].write_fd; //將fd傳給子執行緒 send(sendfd, (char*)&fd, sizeof(evutil_socket_t), 0); } int _tmain(int argc, _TCHAR* argv[]) { //初始化網路庫 #ifdef WIN32 evthread_use_windows_threads(); WSADATA wsa_data; WSAStartup(0x0201, &wsa_data); #endif //為每個子執行緒的事件繫結管道的讀寫事件,子執行緒通過管道與主執行緒進行通訊 int nRet(0); for (int i = 0;i < g_nThreadNum;i++) { evutil_socket_t fds[2]; if(evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) { printf("create socketpair error,g_nThreadNum = %d\n",g_nThreadNum); return false; } //設定成無阻塞的socket evutil_make_socket_nonblocking(fds[0]); evutil_make_socket_nonblocking(fds[1]); g_pThreads[i].read_fd = fds[0]; g_pThreads[i].write_fd = fds[1]; //建立子執行緒的base g_pThreads[i].base = event_base_new(); if (g_pThreads[i].base == NULL) { printf("event_base_new error,g_nThreadNum = %d\n",g_nThreadNum); return 0; } //將檔案描述符和事件進行繫結,並加入到base中 event_set(&g_pThreads[i].event, g_pThreads[i].read_fd, EV_READ | EV_PERSIST, pipe_process, &g_pThreads[i]); nRet = event_base_set(g_pThreads[i].base, &g_pThreads[i].event); nRet = event_add(&g_pThreads[i].event,NULL); if (nRet == -1) { printf("event_add error,g_nThreadNum = %d\n",g_nThreadNum); return 0; } } //建立子執行緒,並啟動子執行緒的事件迴圈,在有註冊事件(管道的讀事件)的情況下迴圈不會退出 for (int i = 0;i < g_nThreadNum;i++) { _beginthreadex( NULL, 0, &Work_Thread, (void*)&g_pThreads[i], 0, (unsigned int*)&g_pThreads[i].did ); } //初始化伺服器地址結構 struct sockaddr_in sSerAddr; memset(&sSerAddr, 0, sizeof(sSerAddr)); sSerAddr.sin_family = AF_INET; sSerAddr.sin_addr.s_addr = htonl(INADDR_ANY); sSerAddr.sin_port = htons(8888); //建立主執行緒base g_DispatcherThread.base = event_base_new(); if (g_DispatcherThread.base == NULL) { printf("g_DispatcherThread.base create error.\n"); return 0; } //建立監聽器 struct evconnlistener *listener; listener = evconnlistener_new_bind(g_DispatcherThread.base, cb_listener, g_DispatcherThread.base, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, (struct sockaddr*)&sSerAddr, sizeof(sSerAddr)); if (listener == NULL) { printf("evconnlistener_new_bind error.\n"); return 0; } //開啟主執行緒的事件迴圈 event_base_dispatch(g_DispatcherThread.base); for (int i = 0;i < g_nThreadNum;i++) { event_base_free(g_pThreads[i].base); } if (g_pThreads) { delete []g_pThreads; g_pThreads = NULL; } evconnlistener_free(listener); event_base_free(g_DispatcherThread.base); WSACleanup(); return 0; }
bufferManager類和上一節的是一樣的,這裡就不列出了,有需要的可以檢視我的上一節部落格。
客戶端傳送檔案的主要程式碼:
WIN32_FIND_DATA FileInfo; HANDLE hFind = INVALID_HANDLE_VALUE; DWORD FileSize = 0; //檔案大小 char buf[10001] = {0}; char *pbuf = NULL; ZeroMemory(&FileInfo,sizeof(WIN32_FIND_DATA)); //獲取檔案大小 hFind = FindFirstFile("3.手工佈局.avi",&FileInfo); if(hFind != INVALID_HANDLE_VALUE) { FileSize = FileInfo.nFileSizeLow + sizeof(PICTUREINFO); } FindClose(hFind); strcpy_s(((PICTUREINFO*)buf)->szFileName,"3.手工佈局.avi"); ((PICTUREINFO*)buf)->nFileSize = FileSize; //第一次傳送10000,帶檔案頭 FILE *f = NULL; fopen_s(&f,"3.手工佈局.avi","rb"); fread(buf + sizeof(PICTUREINFO),10000 - sizeof(PICTUREINFO),1,f); bufferevent_write(bev,buf,10000); FileSize -= 10000; while (FileSize >= 10000) { fread(buf,10000,1,f); bufferevent_write(bev,buf,10000); FileSize -= 10000; } if (FileSize > 0) { fread(buf,FileSize,1,f); bufferevent_write(bev,buf,FileSize); FileSize -= FileSize; } if (f) { fclose(f); f = NULL; }