1. 程式人生 > >搭建一個後臺伺服器--服務端(非同步,大併發)

搭建一個後臺伺服器--服務端(非同步,大併發)

上篇的阻塞模式下伺服器的併發只有幾K,而真正的server 像nginx, apache, yumeiz 輕輕鬆鬆處理幾萬個併發完全不在話下,因此大併發的場合下是不能用阻塞的。

1W的併發是一個分隔點,如果單程序模型下能達到 的話,說明至少在伺服器這塊你已經很厲害了。

伺服器開發就像一門氣功,能不能搞出大併發,容錯性處理得怎麼樣,就是你有沒有內功,內功有多深。

非同步模式是專門為大併發而生,linux下一般用 epoll 來管理事件,下面就開始我們的非同步大併發伺服器實戰吧。

跟阻塞開發一樣,先來看看設計過程:

1.建立事件模型。

2.建立監聽連線並監聽。

3.將監聽連線加入事件模型。

4.當有事件時,判斷事件型別。

5.若事件為監聽連線,則產生客戶連線同時加入事件模型,對客戶連線接收發送。

6.若事件為客戶連線,處理相應IO請求。

為了讓大家一概全貌,我用一個函式實現的( 一個函式寫一個2W併發的伺服器,你試過麼),可讀性可能會差點,但是對付這道面試題是綽綽有餘了。

實際開發過程如下:

先定義一個事件結構,用於對客戶連線進行快取

struct my_event_s
{
	int fd;
	char recv[64];
	char send[64];

	int rc_pos;
	int sd_pos;
};

建立快取物件:

struct epoll_event wait_events[ EPOLL_MAX ];
struct my_event_s my_event[ EPOLL_MAX ];


建立監聽連線:

sock_server = socket( AF_INET, SOCK_STREAM, 0 );
flag = fcntl( sock_server, F_GETFL, 0 );
fcntl( sock_server, F_SETFL, flag | O_NONBLOCK );


繫結地址並監聽:

flag = bind( sock_server, ( struct sockaddr* )&addr_server, sizeof( struct sockaddr ) );
if( flag < 0 )
{
	printf( "your bind is not ok\n" );
	close( sock_server );
	return 0;
}

flag = listen( sock_server, 1024 );
if( flag < 0 )
{
	printf( "your listen is not ok\n");
	close( sock_server );
	return 0;
}


建立事件模型:
epfd = epoll_create( EPOLL_MAX );
if( epfd <= 0 )
{
	printf( "event module could not be setup\n");
	close( sock_server );
	return 0;
}
將監聽事件加入事件模型:
tobe_event.events = EPOLLIN;
tobe_event.data.fd = sock_server;

epoll_ctl( epfd,  EPOLL_CTL_ADD, sock_server,  &tobe_event );

事件模型處理:

e_num = epoll_wait( epfd, wait_events, EPOLL_MAX, WAIT_TIME_OUT );
if( e_num <= 0 )
{
	continue;
}

for( i = 0; i < e_num; ++i )
{


監聽處理:
if( sock_server == wait_events[ i ].data.fd )
{  while(1){

連線客戶端:

sock_client = accept( sock_server, ( struct sockaddr* )&addr_client, ( socklen_t*)&size );
if( sock_client < 0 )
{
	if( errno == EAGAIN )
	{
		break;
	}
	if( errno == EINTR )
	{
		continue;
	}
	break;
}



將客戶端連線設定為非同步:
flag = fcntl( sock_client, F_GETFL, 0 );
fcntl( sock_client, F_SETFL, flag | O_NONBLOCK );

將客戶端連線加入到 事件模型:
tobe_event.events = EPOLLIN | EPOLLET;
tobe_event.data.u32 = my_empty_index;

epoll_ctl( epfd, EPOLL_CTL_ADD, sock_client, &tobe_event );
統計每秒併發數並得到系統當前時間:
++num;
current = time( 0 );
if( current > last )
{
	printf( "last sec qps:%d\n", num );
	num = 0;
	last = current;
}


將時間填充到連線快取中:
memcpy( tobe_myevent->send, &current, sizeof(time_t) );

接收連線內容:

flag = recv( sock_client, tobe_myevent->recv, 64, 0 );
if( flag < 64 )
{
	if( flag > 0 )
		tobe_myevent->rc_pos += flag;
	continue;
}
if( tobe_myevent->recv[31] || tobe_myevent->recv[63] )
{
	printf( "your recv does follow the protocal\n");
	tobe_myevent->fd = 0;
	close( sock_client );
	continue;
}
					

flag = send( sock_client, tobe_myevent->send, sizeof( time_t ), 0 );
if( flag < sizeof( time_t ) )
{
tobe_event.events =  EPOLLET | EPOLLOUT;
epoll_ctl( epfd, EPOLL_CTL_MOD, sock_client, &tobe_event );
	if( flag > 0 )
		tobe_myevent->sd_pos += flag;
	continue;
}
tobe_myevent->fd = 0;
close( sock_client );

後面進行普通連線事件處理,錯誤處理:

if( event_flag & EPOLLHUP )
{
	tobe_myevent->fd = 0;
	close( sock_client );
	continue;
}
else if( event_flag & EPOLLERR )
{
	tobe_myevent->fd = 0;
	close( sock_client );
	continue;
}

寫事件:
else if( event_flag & EPOLLOUT )
{
	if( tobe_myevent->rc_pos != 64 )
	{
		continue;
	}

	if( tobe_myevent->sd_pos >= sizeof( time_t ) )
	{
		tobe_myevent->fd = 0;
		close( sock_client );
		continue;
	}

	flag = send( sock_client, tobe_myevent->send + tobe_myevent->sd_pos, sizeof( time_t ) - tobe_myevent->sd_pos, 0 );
	if( flag < 0 )
	{
		if( errno == EAGAIN )
		{
			continue;
		}
		else if( errno == EINTR )
		{
			continue;
		}
		tobe_myevent->fd = 0;
		close( sock_client );
		continue;
	}

	if( flag >0 )
	{
		tobe_myevent->sd_pos += flag;
		if( tobe_myevent->sd_pos >= sizeof( time_t ) )
		{
			tobe_myevent->fd = 0;
			close( sock_client );
			continue;
		}
	}
}

讀事件:
if( event_flag & EPOLLIN )
{
	if( tobe_myevent->rc_pos < 64 )
	{
		flag = recv( sock_client, tobe_myevent->recv + tobe_myevent->rc_pos, 64 - tobe_myevent->rc_pos, 0 );
		if( flag <= 0 )
		{
			continue;
		}

		tobe_myevent->rc_pos += flag;

		if( tobe_myevent->rc_pos < 64 )
		{
			continue;
		}

		if( tobe_myevent->recv[31] || tobe_myevent->recv[63] )
		{
			printf( "your recv does follow the protocal\n");
			tobe_myevent->fd = 0;
			close( sock_client );
			continue;
		}

		flag = send( sock_client, tobe_myevent->send, sizeof( time_t ), 0 );
		if( flag < sizeof( time_t ) )
		{
			if( flag > 0 )
				tobe_myevent->sd_pos += flag;
		tobe_event.events = EPOLLET | EPOLLOUT;
			tobe_event.data.u32 = wait_events[i].data.u32;
			epoll_ctl( epfd, EPOLL_CTL_MOD, sock_client, &tobe_event );
			continue;
		}
		tobe_myevent->fd = 0;
		close( sock_client );
	}

}
	
到此,一個非同步伺服器搭建完畢,輕鬆實現2W的併發量,既完成了任務,又是實實在在的鍛練了一回。