1. 程式人生 > >windows下Libevent +多執行緒(負載均衡分配法) 之檔案傳輸

windows下Libevent +多執行緒(負載均衡分配法) 之檔案傳輸

一、先說一下服務端的流程:

1、主執行緒負責監聽客戶端的連線;

2、當有客戶端連線時,主執行緒通過管道向相應的子執行緒傳送監聽套接字描述符,子執行緒通過負載均衡法選擇出來;

3、當主執行緒傳送監聽描述符時,子執行緒的讀管道回撥函式會被回撥;

4、子執行緒為收到的監聽描述符設定讀取回調、寫回調、事件回撥等回撥函式;

5、子執行緒通過開啟的事件迴圈,迴圈監聽第4步的事件,並回調相應的回撥函式。

二、客戶端傳送檔案、服務端接收檔案原則(與上一節的方法不同):

1、客戶端傳送檔案時採用讀取多少位元組就傳送多少位元組,程式中設定成10000位元組;

2、服務端接收多少位元組就寫檔案多少位元組

3、與上一節的方法相比,此方法對於大檔案也適用。

三、服務端程式碼,程式碼中有詳細的註釋:

// LibeventMulThreadBalanceDemo.cpp : 定義控制檯應用程式的入口點。
//

#include "stdafx.h"
#include "winsock2.h"
#include <process.h> 
#include <io.h>
#include <fcntl.h>
#include "event2/listener.h"
#include "event2/bufferevent.h"
#include "event2/thread.h"
#include "event.h"
#include "BufferManager.h"
#include "Windows.h"

const int g_nThreadNum = 10;   //開啟的執行緒數量

typedef struct Libevent_Thread
{
	DWORD did;                //子執行緒ID
	struct event_base *base;  //子執行緒base
	struct event event;       //子執行緒event
	evutil_socket_t read_fd;  //讀管道描述符
	evutil_socket_t write_fd; //寫管道描述符
} LIBEVENT_THREAD;

typedef struct Dispatcher_Thread
{
	DWORD did;                //主執行緒ID
	struct event_base *base;  //主執行緒base
} DISPATCHER_THREAD;

LIBEVENT_THREAD *g_pThreads = new LIBEVENT_THREAD[g_nThreadNum];

DISPATCHER_THREAD g_DispatcherThread;

int g_nlastThread = 0;

typedef struct PictureInfo
{
	char szFileName[260];
	long nFileSize;
} PICTUREINFO;

//讀緩衝去回撥函式
void read_cb(struct bufferevent *bev, void *arg)
{
	BufferManager* bm = (BufferManager*)arg;

	//第一次接收
	if (bm->nFileSize == 0)
	{
		int nReceived = bufferevent_read(bev, bm->buf,10000);
		bm->nReceiveTotal += nReceived;

		if (nReceived >= sizeof(PICTUREINFO))
		{
			bm->nFileSize = ((PICTUREINFO*)bm->buf)->nFileSize;
			strcpy_s(bm->szImgName,sizeof(bm->szImgName),((PICTUREINFO*)bm->buf)->szFileName);

			bm->f = NULL;
			fopen_s(&bm->f,bm->szImgName,"wb");
			if (fwrite(bm->buf+sizeof(PICTUREINFO),bm->nReceiveTotal - sizeof(PICTUREINFO),1,bm->f) < 1){
				// write error
			}
		}
	}
	else if ((bm->nFileSize - bm->nReceiveTotal) >= 10000)
	{
		int nReceived = bufferevent_read(bev, bm->buf,10000);
		bm->nReceiveTotal += nReceived;

		if (fwrite(bm->buf,nReceived,1,bm->f) < 1){
			// write error
		}
	}
	else if((bm->nFileSize - bm->nReceiveTotal) >= 0)
	{
		int nReceived = bufferevent_read(bev, bm->buf, bm->nFileSize-bm->nReceiveTotal);
		bm->nReceiveTotal += nReceived;

		if (fwrite(bm->buf,nReceived,1,bm->f) < 1){
			// write error
		}
	}

	if (bm->nReceiveTotal == bm->nFileSize)
	{
		printf("收到的位元組數: %d ,nThreadID = %d\n",bm->nReceiveTotal,GetCurrentThreadId());
		bm->iniParam();
	}
}

//寫緩衝區回撥函式
void write_cb(struct bufferevent *bev, void *arg)
{
	printf("成功寫資料給客戶端,寫緩衝區回撥函式被回撥.\n");
}

//事件回撥函式
void event_cb(struct bufferevent *bev,short events, void *arg)
{
	BufferManager* pThis = (BufferManager*)arg;

	if (events & BEV_EVENT_EOF)
	{
		printf("connection close.\n");
	} 
	else if(events & BEV_EVENT_ERROR)
	{
		printf("some other error.\n");
	}

	//登出事件導致事件迴圈退出,這樣子執行緒也將退出
	bufferevent_free(bev);

	if (pThis)
	{
		delete pThis;
		pThis = NULL;
	}

	printf("bufferevent 資源已經被釋放.\n");
}

//開啟子執行緒的事件迴圈
unsigned __stdcall Work_Thread( void* pArguments )
{
	LIBEVENT_THREAD *pThis = (LIBEVENT_THREAD*)pArguments;
	
	event_base_dispatch(pThis->base);

	_endthreadex(0);
	return 1;
}

//管道讀回撥函式
void pipe_process(int fd, short which, void *arg)
{
	LIBEVENT_THREAD* pThis = (LIBEVENT_THREAD*)arg;

	//獲取管道的讀取描述符
	int readfd = pThis->read_fd;
	evutil_socket_t evsock;
	recv(readfd, (char*)&evsock, sizeof(evutil_socket_t), 0);

	//為新的連線關聯事件
	struct bufferevent* bev;
	bev = bufferevent_socket_new(pThis->base, evsock, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);

	BufferManager *bm = new BufferManager;

	//給bufferevent緩衝區設定回撥
	bufferevent_setcb(bev, 
		read_cb,
		write_cb,
		event_cb,
		bm);

	//啟動bufferevent的讀緩衝區,讀緩衝區預設是disable的.
	bufferevent_enable(bev, EV_READ);
}

//主執行緒監聽器回撥函式
void cb_listener(struct evconnlistener* listener, 
				evutil_socket_t fd, 
			    struct sockaddr *addr, 
				int len, 
				void *ptr)
{
	printf("new client connect.\n");

	//採用負載均衡演算法為當前連線選擇子執行緒
	int nCurThread = g_nlastThread % g_nThreadNum;
	g_nlastThread = nCurThread + 1;
	int sendfd = g_pThreads[nCurThread].write_fd;

	//將fd傳給子執行緒
	send(sendfd, (char*)&fd, sizeof(evutil_socket_t), 0);
}

int _tmain(int argc, _TCHAR* argv[])
{
	//初始化網路庫
#ifdef WIN32
	evthread_use_windows_threads();

	WSADATA wsa_data;
	WSAStartup(0x0201, &wsa_data);
#endif

	//為每個子執行緒的事件繫結管道的讀寫事件,子執行緒通過管道與主執行緒進行通訊
	int nRet(0);
	for (int i = 0;i < g_nThreadNum;i++)
	{
		evutil_socket_t fds[2];
		if(evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) 
		{
			printf("create socketpair error,g_nThreadNum = %d\n",g_nThreadNum);
			return false;
		}

		//設定成無阻塞的socket
		evutil_make_socket_nonblocking(fds[0]);
		evutil_make_socket_nonblocking(fds[1]);

		g_pThreads[i].read_fd = fds[0];
		g_pThreads[i].write_fd = fds[1];

		//建立子執行緒的base
		g_pThreads[i].base = event_base_new();
		if (g_pThreads[i].base == NULL)
		{
			printf("event_base_new error,g_nThreadNum = %d\n",g_nThreadNum);
			return 0;
		}

		//將檔案描述符和事件進行繫結,並加入到base中
		event_set(&g_pThreads[i].event,
			      g_pThreads[i].read_fd,
				  EV_READ | EV_PERSIST,
				  pipe_process,
				  &g_pThreads[i]);

		nRet = event_base_set(g_pThreads[i].base, &g_pThreads[i].event);
		nRet = event_add(&g_pThreads[i].event,NULL);
		if (nRet == -1)
		{
			printf("event_add error,g_nThreadNum = %d\n",g_nThreadNum);
			return 0;
		}
	}

	//建立子執行緒,並啟動子執行緒的事件迴圈,在有註冊事件(管道的讀事件)的情況下迴圈不會退出
	for (int i = 0;i < g_nThreadNum;i++)
	{
		 _beginthreadex( NULL, 0, &Work_Thread, (void*)&g_pThreads[i], 0, (unsigned int*)&g_pThreads[i].did );
	}

	//初始化伺服器地址結構
	struct sockaddr_in sSerAddr;
	memset(&sSerAddr, 0, sizeof(sSerAddr));
	sSerAddr.sin_family = AF_INET;
	sSerAddr.sin_addr.s_addr = htonl(INADDR_ANY);
	sSerAddr.sin_port = htons(8888);

	//建立主執行緒base
	g_DispatcherThread.base = event_base_new();
	if (g_DispatcherThread.base == NULL)
	{
		printf("g_DispatcherThread.base create error.\n");
		return 0;
	}

	//建立監聽器
	struct evconnlistener *listener;
	listener = evconnlistener_new_bind(g_DispatcherThread.base, 
		cb_listener, 
		g_DispatcherThread.base, 
		LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, 
		-1, 
		(struct sockaddr*)&sSerAddr, 
		sizeof(sSerAddr));

	if (listener == NULL)
	{
		printf("evconnlistener_new_bind error.\n");
		return 0;
	}

	//開啟主執行緒的事件迴圈
	event_base_dispatch(g_DispatcherThread.base);

	for (int i = 0;i < g_nThreadNum;i++)
	{
		event_base_free(g_pThreads[i].base);
	}

	if (g_pThreads)
	{
		delete []g_pThreads;
		g_pThreads = NULL;
	}

	evconnlistener_free(listener);
	event_base_free(g_DispatcherThread.base);
	WSACleanup();

	return 0;
}

bufferManager類和上一節的是一樣的,這裡就不列出了,有需要的可以檢視我的上一節部落格。

客戶端傳送檔案的主要程式碼:

WIN32_FIND_DATA FileInfo;
		HANDLE hFind = INVALID_HANDLE_VALUE;
		DWORD FileSize = 0;                   //檔案大小

		char buf[10001] = {0};
		char *pbuf = NULL;
		ZeroMemory(&FileInfo,sizeof(WIN32_FIND_DATA));

		//獲取檔案大小
		hFind = FindFirstFile("3.手工佈局.avi",&FileInfo); 
		if(hFind != INVALID_HANDLE_VALUE) 
		{
			FileSize = FileInfo.nFileSizeLow  + sizeof(PICTUREINFO);
		}
		FindClose(hFind);

		strcpy_s(((PICTUREINFO*)buf)->szFileName,"3.手工佈局.avi");
		((PICTUREINFO*)buf)->nFileSize = FileSize;

		//第一次傳送10000,帶檔案頭
		FILE *f = NULL;
		fopen_s(&f,"3.手工佈局.avi","rb");
		fread(buf + sizeof(PICTUREINFO),10000 - sizeof(PICTUREINFO),1,f);
		bufferevent_write(bev,buf,10000);
		FileSize -= 10000;

		while (FileSize >= 10000)
		{
			fread(buf,10000,1,f);
			bufferevent_write(bev,buf,10000);

			FileSize -= 10000;
		}

		if (FileSize > 0)
		{
			fread(buf,FileSize,1,f);
			bufferevent_write(bev,buf,FileSize);
		
			FileSize -= FileSize;
		}

		if (f)
		{
			fclose(f);
			f = NULL;
		}