1. 程式人生 > >一個epoll多程序伺服器示例

一個epoll多程序伺服器示例

#include<iostream>
#include<stdlib.h>
#include<string.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include <unistd.h>
#include<sys/wait.h>
#include<stdio.h>
#include<map>
#include<fstream>
#include<time.h>
#include <unistd.h>
#include <fcntl.h>
#include <dirent.h>
#include<errno.h>
using namespace std;

#define MAX_CON_CNT 1000
#define MAX_EV_CNT  20

static int SER_PORT;
std::map<int, pid_t> ve;

struct flock* file_lock_fun(short type, short whence)
{
	static struct flock ret;
	ret.l_type = type;
	ret.l_start = 0x00;
	ret.l_whence = whence;
	ret.l_len = 0x05;
	ret.l_pid = getpid();
	return & ret;
}

void set_noblock(int  fd)
{
	int opts = fcntl(fd,F_GETFL);
	if(opts < 0x00)
	{
		std::cout<<"set no block failed ....."<<std::endl;
	}
	else
	{
		opts |= O_NONBLOCK;
		fcntl(fd,F_SETFL,opts);
	}
}

int pre_work()
{
	int ser_fd = socket(AF_INET,SOCK_STREAM,0x00);
	struct sockaddr_in addr;
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = inet_addr("127.0.0.1");
	addr.sin_port = htons(SER_PORT);
	
	int flag = 0x01;
	if(setsockopt(ser_fd,SOL_SOCKET,SO_REUSEADDR, &flag, sizeof(flag)) == -1)
	{
		std::cout<<"reuse failed ..."<<std::endl;
		return -1;
	}

	if( -1 == bind(ser_fd, (struct sockaddr*)&addr, sizeof(addr)))
	{
		std::cout<<"bind failed ....."<<getpid()<<"  "<<SER_PORT<<std::endl;
		exit(0x01);
	}
	
	listen(ser_fd, 0x05);
	set_noblock(ser_fd);
	return ser_fd;
}

int lock(std::string fname)
{
	int m_fd = open(fname.c_str(),O_WRONLY);
	if(m_fd > 0x00 && -1 != fcntl(m_fd,F_SETLK,file_lock_fun(F_WRLCK, SEEK_SET)))
	{
		return m_fd;
	}
	else
	{
		if(m_fd > 0x00)
		{
			close(m_fd);
		}
	}
	
	return -1;
}

void unlock(int m_fd)
{
	if(m_fd > 0x00)
	{
		fcntl(m_fd,F_SETLKW,file_lock_fun(F_UNLCK, SEEK_SET));	
		close(m_fd);
	}

}

std::string get_fname()
{
	std::string filename = "";
	char pbuf[256] = {0x00};
	getcwd(pbuf,sizeof(pbuf));
	strcat(pbuf,"/lk");
	filename = std::string(pbuf);
	return filename;
}

void create_lkfile()
{
	std::string fname = get_fname();
	ofstream ofile(fname.c_str(), ios::trunc);
	if(ofile.is_open())
	{
		ofile<<"filelock"<<std::endl;
		ofile.close();
	}
	else
		std::cout<<"create lkfile failed ......"<<std::endl;
}

void work(int ser_fd)
{
	std::string filename = get_fname();
	
	struct epoll_event ev;
	ev.data.fd = ser_fd;
	ev.events = EPOLLIN | EPOLLET;
	
	int ep_fd = epoll_create(MAX_CON_CNT);
	epoll_ctl(ep_fd, EPOLL_CTL_ADD, ser_fd, &ev);
	
	std::cout<<"create ------------"<<ep_fd<<"    "<<getpid()<<std::endl;
	
	struct epoll_event ev_arr[MAX_EV_CNT];
	
	while(1)
	{
		int cnt = epoll_wait(ep_fd, ev_arr,MAX_EV_CNT, -1);
		for(int index = 0x00; index <cnt; index++)
		{
			if(ev_arr[index].data.fd == ser_fd)
			{
				int m_fd = lock(filename);
				if(m_fd <= 0x00)
					continue;
					
				bool loop = true;
				int get_cnt = 0x00;
				while(loop)
				{
					if(get_cnt >= 10)
						break;
						
					struct sockaddr_in cli_addr;
					socklen_t len = 0x00;
					int new_fd = accept(ser_fd, (struct sockaddr*)(&cli_addr),&len);
					if(new_fd <= 0x00)
					{
						if(errno == EAGAIN || errno == EWOULDBLOCK)
						{
							loop = false;
							continue;
						}	
						std::cout<<"accept failed ......."<<getpid()<<std::endl;
					}
					else
					{
						struct epoll_event ev;
						ev.data.fd = new_fd;
						ev.events = EPOLLIN;
						epoll_ctl(ep_fd, EPOLL_CTL_ADD, new_fd, &ev);
						std::cout<<"new con ......"<<new_fd<<"   "<<ep_fd<<"    "<<getpid()<<std::endl;
						get_cnt++;
					}
				}
				unlock(m_fd);
			}
			else if(ev_arr[index].events & EPOLLIN)
			{
				int cli_fd = ev_arr[index].data.fd;
				char buf[1024] = {0x00};
				int cnt = recv(cli_fd, buf, sizeof(buf), 0x00);
				if(cnt <= 0x00)
				{
					epoll_ctl(ep_fd, EPOLL_CTL_DEL, cli_fd,ev_arr + index);
					close(cli_fd);
					ev_arr[index].data.fd = -1;
					time_t ts = time(NULL);
					std::cout<<"close con......."<<cli_fd<<"  "<<ep_fd<<"     "<<ctime(&ts);
				}
				else
				{
					//std::cout<<"recv data ......"<<buf<<"    "<<getpid()<<std::endl;
				}
			}
		}
	}

}


void show()
{
	for(std::map<int,int>::iterator it = ve.begin(); it != ve.end(); it++)
		std::cout<< it->first<<"   "<<it->second<<std::endl;
	
	if(ve.empty())
		std::cout<<"empty......"<<std::endl;
}

void fork_child(int num,int fd)
{
	for(int index = 0x00; index < num; index++)
	{
		int channel[2] = {0x00};
		char buf[10] = {0x00};
		if(socketpair(AF_UNIX,SOCK_STREAM,0,channel) == -1)
		{
			perror("scoket pair failed ......");
			continue;
		}
		
		pid_t id = fork();
		if(id == 0x00)
		{
			close(channel[1]);
			sprintf(buf,"%d",getpid());
			write(channel[0] , buf , strlen(buf));
			work(fd);
			return ;
		}	
		if(id < 0x00)
			std::cout<<" fork failed ....."<<std::endl;
			
		close(channel[0x00]);
		read(channel[1], buf, sizeof(buf));
		std::cout<<"child pid is "<<buf<<std::endl;
		close(channel[0x01]);
		ve[atoi(buf)] = SER_PORT;
		//SER_PORT++;
	}

}

int main()
{
	create_lkfile();
	SER_PORT = 6000;
	int ser_fd = pre_work();
	if(-1 == ser_fd)
	{
		std::cout<<"pre_work faild ...."<<std::endl;
		exit(0x01);
	}
	fork_child(0x04,ser_fd);
	sleep(1);
	show();	
	while(1)
	{
		int status = 0x00;
		pid_t id = wait(&status);
		std::map<int,int>::iterator it = ve.find(id);
		if(it != ve.end())
		{
			SER_PORT = it->second;
			ve.erase(it);
		}
		
		fork_child(0x01,ser_fd);
		show();
		std::cout<<id<<"---over ......"<<std::endl;
	}
	return 0x00;
}

python客戶端

import threading
import sys
import time
import socket, sys
import select
import os

def echo():
    host = "localhost"
    textport = 6000 
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        port = int(textport)
    except ValueError:
        port = socket.getservbyname(textport, 'tcp')
    s.connect((host, port))
    arr = [1,2,3,4,5,4,3,2,1,6]
    cnt = 0
    while cnt < 1000:
	    select.select([],[],[],0.1 * arr[cnt%10])
		data = time.ctime() * 100
		data.join("\r\n")
		s.sendall(data)
		cnt += 1
       
    s.close()

if __name__ == "__main__":
    threads = []
    for x in range(1000):
	for i in xrange(100):
		os = threading.Thread(None,echo)
		os.start()
		threads.append(os)
	time.sleep(10)
    for x in threads:
        x.join()


伺服器的負載均衡有待完善