1. 程式人生 > >linux 下多執行緒epoll程式設計 -socket

linux 下多執行緒epoll程式設計 -socket

轉載自:http://blog.csdn.net/susubuhui/article/details/37906287

Linux socket+epoll+pthread+佇列 實現併發伺服器。程式碼有封裝,僅做參考

Linux下多執行緒epoll程式設計,在高併發下測試通過,可以支援10000使用者同時線上,測試伺服器為Linode的vps伺服器,作業系統為Centos64



// cs_network.cpp


// created by ccc


#include "config.h"
#include "cs_network.h"
#include <iostream>
#include <sys/socket.h>


#define VERSION_SOLARIS 0


#if VERSION_SOLARIS
 #include <port.h>
#else
 #include <sys/epoll.h>
#endif


#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "cs_data_inspect.h"
#include "cs_heart_thread.h"
#include "cs_gdata.h"
#include "cs_work_thread.h"


#define SERV_PORT  5565 // 伺服器埠
#define LISTENQ          128   // listen sock 引數
#define MAX_EPOLL_EVENT_COUNT      500   // 同時監聽的 epoll 數
#define MAX_READ_BUF_SIZE    1024 * 8 // 最大讀取資料 buf


#define SOCKET_ERROR -1


static int epfd;
static int listenfd;


static pthread_t tids[NETWORK_READ_THREAD_COUNT];
static pthread_mutex_t mutex_thread_read;
static pthread_cond_t cond_thread_read;
static void *thread_read_tasks(void *args);


static st_io_context_task *read_tasks_head = NULL;
static st_io_context_task *read_tasks_tail = NULL;


//////////////////////////////////////////////////////////////////////////////////////////




static void append_read_task(st_context *context)
{
 st_io_context_task *new_task = NULL;


 new_task = new st_io_context_task();
 new_task->context = context;


 pthread_mutex_lock(&mutex_thread_read);


 if(read_tasks_tail == NULL)
 {
  read_tasks_head = new_task;
  read_tasks_tail = new_task;
 }   
 else
 {   
  read_tasks_tail->next= new_task;
  read_tasks_tail = new_task;
 }  


 pthread_cond_broadcast(&cond_thread_read);
 pthread_mutex_unlock(&mutex_thread_read); 
}


void _setnonblocking(int sock)
{
 int opts;
 opts = fcntl(sock, F_GETFL);
 if(opts < 0){
  log("fcntl(sock,GETFL)");
  exit(1);
 }


 opts = opts | O_NONBLOCK;
 if(fcntl(sock, F_SETFL, opts)<0){
  log("fcntl(sock,SETFL,opts)");
  exit(1);
 }
}




void* get_network_event(void *param)
{
 long network_event_id;


 int i, sockfd;


 network_event_id = (long) param;


 log("begin thread get_network_event: %ld", network_event_id);


 st_context *context = NULL;


#if VERSION_SOLARIS
 uint_t nfds;
 port_event_t now_ev, ev, events[MAX_EPOLL_EVENT_COUNT];
#else
 unsigned nfds;
 struct epoll_event now_ev, ev, events[MAX_EPOLL_EVENT_COUNT];
#endif




#if VERSION_SOLARIS
 struct timespec timeout;
 timeout.tv_sec = 0;
 timeout.tv_nsec = 50000000;
#endif


 while(1) 
 {
#if VERSION_SOLARIS
  nfds = MAX_EPOLL_EVENT_COUNT;
  if (port_getn(epfd, events, MAX_EPOLL_EVENT_COUNT, &nfds, &timeout) != 0){


   if (errno != ETIME){
    log("port_getn error");
    return false;
   }
  }


  if (nfds == 0){
   continue;
  }
  else{
   // log("on port_getn: %d", nfds);
  }
#else
  //等待epoll事件的發生
  nfds = epoll_wait(epfd, events, MAX_EPOLL_EVENT_COUNT, 100000);
#endif


  //處理所發生的所有事件
  for(i = 0; i < nfds; i++)
  {
   now_ev = events[i];


#if VERSION_SOLARIS
   context = (st_context *)now_ev.portev_user;
#else
   context = (st_context *)now_ev.data.ptr;
#endif


#if VERSION_SOLARIS
   if (now_ev.portev_source != PORT_SOURCE_FD){
    continue;
   }


   if(now_ev.portev_object == listenfd)
#else
   if(context->fd == listenfd)
#endif
   {
#if VERSION_SOLARIS
    // 重新關聯listen fd
    port_associate(epfd, PORT_SOURCE_FD, listenfd, POLLIN, context);
#endif


    //append_read_task(NULL, true);
    int connfd;
    struct sockaddr_in clientaddr = {0};
    socklen_t clilen = sizeof(clientaddr);


    connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);


    if(connfd == -1){
     log("connfd == -1 [%d]", errno);
     continue;
    }


    _setnonblocking(connfd);
    int nRecvBuf=128*1024;//設定為32K
    setsockopt(connfd, SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int));
    //傳送緩衝區
    int nSendBuf=128*1024;//設定為32K
    setsockopt(connfd, SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int));


    int nNetTimeout=1000;//1秒
    //傳送時限
    setsockopt(connfd, SOL_SOCKET, SO_SNDTIMEO, (const char *)&nNetTimeout, sizeof(int));


    //接收時限
    setsockopt(connfd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&nNetTimeout, sizeof(int));


    const char *remote_addr = inet_ntoa(clientaddr.sin_addr);
    int  remote_port = ntohs(clientaddr.sin_port);


    context = query_context(connfd, remote_addr, remote_port);


    mutex_lock(mutex_id_pool.mutex);
    context->id = add_client(context);
    mutex_unlock(mutex_id_pool.mutex);


 //#if IS_DEBUG
 //   log("new obj fd: %d, id: %d context:%8X", connfd, context->id, context);
 //#endif


#if VERSION_SOLARIS
    port_associate(epfd, PORT_SOURCE_FD, connfd, POLLIN, context);
#else
    struct epoll_event ev;


    ev.events = EPOLLIN | EPOLLET;
    ev.data.ptr = context;
    epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
#endif
   }


#if VERSION_SOLARIS
   else if(now_ev.portev_events == POLLIN)
   {
    //log("on: now_ev.portev_events == POLLIN");
    append_read_task(context);
   }
   else{
    log("unknow portev_events: %d", now_ev.portev_events);
   }
#else
   else if(now_ev.events & EPOLLIN)
   {
    append_read_task(context);
   }
   else if(now_ev.events & EPOLLOUT)
   { 
    sockfd = context->fd;


    struct epoll_event ev;


    ev.events = EPOLLIN | EPOLLET;
    ev.data.ptr = context;
    epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
   }
   else if(now_ev.events & EPOLLHUP)
   {
    log("else if(now_ev.events & EPOLLHUP)");
    del_from_network(context);
   }
   else
   {
    log("[warming]other epoll event: %d", now_ev.events);
   }
#endif
  }
 }


 return NULL;
}


bool start_network()
{
 int i, sockfd;
 int optionVal = 0;


 st_context *context = NULL;


#if VERSION_SOLARIS
 uint_t nfds;
 port_event_t ev;
#else
 unsigned nfds;
 struct epoll_event ev;
#endif


 pthread_mutex_init(&mutex_thread_read, NULL);


 pthread_cond_init(&cond_thread_read, NULL);


 int ret;
 pthread_attr_t tattr;


 /* Initialize with default */
 if(ret = pthread_attr_init(&tattr)){
  perror("Error initializing thread attribute [pthread_attr_init(3C)] ");
  return (-1);
 }


 /* Make it a bound thread */
 if(ret = pthread_attr_setscope(&tattr, PTHREAD_SCOPE_SYSTEM)){
  perror("Error making bound thread [pthread_attr_setscope(3C)] ");
  return (-1);
 }


 //初始化用於讀執行緒池的執行緒,開啟兩個執行緒來完成任務,兩個執行緒會互斥地訪問任務連結串列
 for (i = 0; i < NETWORK_READ_THREAD_COUNT; i++){


  pthread_create(&tids[i], &tattr, thread_read_tasks, NULL);
  //log("new read task thread %8X created.", tids[i]);
 }


#if VERSION_SOLARIS
 epfd = port_create();
#else
 epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
#endif


 struct sockaddr_in serveraddr;


 if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){


  log("can't create socket.\n");


  return false;
 }


 //把socket設定為非阻塞方式
 _setnonblocking(listenfd);


 optionVal = 0;
 setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,  &optionVal, sizeof(optionVal));


 {
  //設定與要處理的事件相關的檔案描述符
  context = query_context(listenfd, "localhost", SERV_PORT);


#if VERSION_SOLARIS
  //註冊epoll事件
  port_associate(epfd, PORT_SOURCE_FD, listenfd, POLLIN, context);
#else
  ev.data.ptr = context;
  ev.events = EPOLLIN;
  epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
#endif
 }


 memset(&serveraddr, 0, sizeof(serveraddr));
 serveraddr.sin_family = AF_INET;
 serveraddr.sin_port = htons(SERV_PORT);
 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);


 if (bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr))  == -1){


  log("bind error: %d\n", errno);


  return false;
 }


 //開始監聽
 if (listen(listenfd, LISTENQ) == -1){


  log("listen error: %d\n", errno);


  return false;
 }


 log("");
 log("**************************************");
 log("********   cserver start ok   ********");
 log("**************************************");
 
 pthread_t tids_get_network_event[NETWORK_READ_THREAD_COUNT];


 for (int i = 1; i <= 2; i++){
  pthread_create(&tids_get_network_event[i], &tattr, get_network_event, (void*)i);
 }


 get_network_event(NULL);


 return true;
}


void shutdown_network()
{
 log("begin shutdown network ...");
}


void *thread_read_tasks(void *args)
{
 bool is_listenfd;
 st_context *context;


 while(1)
 {
  //互斥訪問任務佇列
  pthread_mutex_lock(&mutex_thread_read);


  //等待到任務佇列不為空
  while(read_tasks_head == NULL)
   pthread_cond_wait(&cond_thread_read, &mutex_thread_read); //執行緒阻塞,釋放互斥鎖,當等待的條件等到滿足時,它會再次獲得互斥鎖


  context     = read_tasks_head->context;


  //從任務佇列取出一個讀任務
  st_io_context_task *tmp = read_tasks_head;
  read_tasks_head = read_tasks_head->next;
  delete tmp;


  if (read_tasks_head == NULL){
   read_tasks_tail = NULL;
  }


  pthread_mutex_unlock(&mutex_thread_read);


  {
   char buf_read[MAX_READ_BUF_SIZE];
   int  read_count;


   read_count = recv(context->fd, buf_read, sizeof(buf_read), 0);


   //log("read id[%d]errno[%d]count[%d]", context->id, errno, read_count);


   if (read_count < 0)
   {
    if (errno == EAGAIN){
     continue;
    }


    log("1 recv < 0: errno: %d", errno);
    //if (errno == ECONNRESET){
    // log("client[%s:%d] disconnect ", context->remote_addr, context->remote_port);
    //}


    del_from_network(context);
    continue;
   }
   else if (read_count == 0)
   {
    //客戶端關閉了,其對應的連線套接字可能也被標記為EPOLLIN,然後伺服器去讀這個套接字
    //結果發現讀出來的內容為0,就知道客戶端關閉了。
    log("client close connect! errno[%d]", errno);


    del_from_network(context);
    continue;
   } 
   else
   {
    do 
    {
     if (! on_recv_data(context, buf_read, read_count)){
      context->is_illegal = true;


      log("當前客戶資料存在異常");
      del_from_network(context);
      break;
     }


     read_count = read(context->fd, buf_read, sizeof(buf_read));


     if (read_count <= 0){
      if (errno == EINTR)
       continue;
      if (errno == EAGAIN){
#if VERSION_SOLARIS
       port_associate(epfd, PORT_SOURCE_FD, context->fd, POLLIN, context);
#endif
       break;
      }




      log("2 error read_count < 0, errno[%d]", errno);


      del_from_network(context);
      break;
     }
    }
    while(1);
   }
  }
 }


 log("thread_read_tasks end ....");
}


void del_from_network(st_context *context, bool is_card_map_locked)
{
 gdata.lock();
 if (gdata.manager == context){
  gdata.manager = NULL;
 }
 gdata.unlock();


 context->lock();


 if (! context->valide){
  log("del_from_network is not valide");
  context->unlock();
  return;
 }


 // 斷開連線
 int fd = context->fd;


 if (fd != -1){
#if VERSION_SOLARIS
  port_dissociate(epfd, PORT_SOURCE_FD, fd);
#else
  struct epoll_event ev;
  ev.data.fd = fd;
  epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev);
#endif
  close_socket(fd);
  context->fd = -1;
 }


 context->valide = false;
 log("client[%d] is invalide", context->id);
 context->unlock();


 mutex_lock(mutex_id_pool.mutex);
 del_client(context->id);
 mutex_unlock(mutex_id_pool.mutex);
}


bool context_write(st_context *context, const char* buf, int len)
{
 if (len <= 0) return true;


 int fd = context->fd;


 if (fd == -1){
  return false;
 }


 int nleft = len;
 int nsend;


 bool result = true;


    while(nleft > 0){
        nsend = write(fd, &buf[len - nleft], nleft);


        if (nsend < 0) {


            if(errno == EINTR) continue;
   if(errno == EAGAIN) {
    break;
   }


            result = false;
   break;
        }
        else if (nsend == 0){


            result = false;
   break;
        }
        nleft -= nsend;
    }


 return result;
}

相關推薦

linux 執行epoll程式設計 socket

轉載自:http://blog.csdn.net/susubuhui/article/details/37906287 Linux socket+epoll+pthread+佇列 實現併發伺服器。程式碼有封裝,僅做參考 Linux下多執行緒epoll程式設計,在高併發下測

Linux執行(pthread)程式設計例項

Linux系統下的多執行緒遵循POSIX執行緒介面,稱為 pthread。編寫Linux下的多執行緒程式,需要使用標頭檔案pthread.h,連線時需要使用庫libpthread.a。順便說一下,Linux 下pthread的實現是通過系統呼叫clone()來實現的。clon

C/C++ Linux執行程式設計 #include

1.最基礎,程序同時建立5個執行緒,各自呼叫同一個函式 #include <iostream> #include <pthread.h> //多執行緒相關操作標頭檔案,可移植眾多平臺   using namespa

linux執行程式設計pthread 同步 互斥

前言 linux下關於並行程式設計有兩種實現方式:fork和pthread_create;其實核心中的執行路徑是相同的,只是flags不一樣罷了。本文的主題是關於pthread_create多執行緒

Linux執行程式設計遇到的一些問題

今天在學習了Linux的多執行緒程式設計的基礎的知識點。於是就試著做了一個簡單的Demo。本以為會得到預期的結果。不成想卻遇到了意想不到的問題。 程式碼展示 我的C 程式碼很簡單,就是一個簡單的示例程式,如下: #include <s

Linux執行程式設計學習【2】——同代…

要想一份程式碼在linux下能編譯,在windows下也能編譯,就得應用巨集處理。最初產生這個構想,是在學習opengl的時候,發覺glut庫是跨平臺的,檢視原始碼後發覺glut裡面進行了很多巨集處理。這是第一次知道編譯器在進行編譯的時候也會定義一些巨集關鍵字。 程式結果如下: 在win8系統下,用d

Linux執行程式設計互斥鎖和條件變數的簡單使用

Linux下的多執行緒遵循POSIX執行緒介面,稱為pthread。編寫Linux下的多執行緒程式,需要使用標頭檔案pthread.h,連結時需要使用庫libpthread.a。執行緒是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位。執行緒

linux 執行1

舉例UNIX International 執行緒 UNIX International 執行緒的標頭檔案是<thread.h> [1]  ,僅適用於Sun Solaris作業系統。所以UNIX International執行緒也常被俗稱為Solaris執

為什麼linux執行程式如此消耗虛擬記憶體

最近遊戲已上線運營,進行伺服器記憶體優化,發現一個非常奇妙的問題,我們的認證伺服器(AuthServer)負責跟第三方渠道SDK打交道(登陸和充值),由於採用了curl阻塞的方式,所以這裡開了128個執行緒,奇怪的是每次剛啟動的時候佔用的虛擬記憶體在2.3G,然後每次處理訊息就增加64M,

Linux執行模擬生產者/消費者問題

/*用執行緒的同步和互斥來實現"生產者-消費者"問題.*/ /* 多生產者多消費者多緩衝區 生產者和消費者不可同時進行 */ #include <stdio.h> #include <stdlib.h> //#include <unistd.h

windows和linux執行的一些區別

我認為linux的多執行緒不如windows。理由如下:一、功能WaitForSingleObject在linux下可以用pthread_cond_wait來替代實現,但是pthread_cond_wait不能用來等待thread handle。要等待thread handl

Linux執行檔案傳輸

要求:服務端客戶端分辨各佔一個程序,客戶端中可設定TCP連線數n,之後將檔案等分成n塊同時傳輸。 思路: 在網上查到了許多關於Linux下socket檔案傳輸的文章,受益許多,其中有個部落格寫的很好 連結:http://blog.csdn.net/zhqia

linux執行中條件變數的用法

使用條件變數最大的好處是可以避免忙等。相當與多執行緒中的訊號。 條件變數是執行緒中的東西就是等待某一條件的發生和訊號一樣以下是說明,條件變數使我們可以睡眠等待某種條件出現。條件變數是利用執行緒間共享的全域性變數進行同步的一種機制,主要包括兩個動作:一個執行緒等待"條件變數的條件成立"而掛起;另一個執行緒

linux執行同步機制之訊號量、互斥量、讀寫鎖、條件變數

之前有寫過類似的部落格,這東西不用老忘,現在又有更清晰的理解了。 一、訊號量 編譯時候加入-lrt 訊號量最基本的兩個操作就是PV操作:P()操作實現訊號量減少,V()操作實現訊號量的增加 訊號量的值取決於訊號量的型別,訊號量的型別有多種: (1)二進位制訊號量:0與1.

Linux執行,斷點續傳,命令列下載工具axel

 參考:http://www.2cto.com/os/201202/118482.html 1、安裝方法Ubuntu sudo apt-get install axel  2、man一下 名稱        Axel - Linux 下輕量的下載加速器。 總覽      

c++ 網路程式設計(九)TCP/IP LINUX/windows 執行超詳細教程 以及 執行實現服務端

#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <stdlib.h> #include <process.h> #include <winsock2.h> #include <win

c++ 網路程式設計(九)TCP/IP LINUX/windows 執行超詳細教程 以及 執行實現服務端

原文作者:aircraft 原文連結:https://www.cnblogs.com/DOMLX/p/9661012.html  先講Linux下(windows下在後面可以直接跳到後面看): 一.執行緒基本概念 前面我們講過多程序伺服器,但我們知道它開銷很大

C/S模式---執行程式設計

伺服器採用單程序/執行緒程式設計,在同一時刻,伺服器只能與一個客戶端進行互動。只有與當前客戶端的通訊結束後,才能為下一個客戶端進行服務。所以,如果採用執行緒,讓主執行緒連線客戶端,而函式執行緒為每個客戶端進行服務,這樣就可以保證伺服器可以同時為多個客戶端提供服務,實現併發。 採用多執

Linux c執行程式設計的4個例項

在主流的作業系統中,多工一般都提供了程序和執行緒兩種實現方式,程序享有獨立的程序空間,而執行緒相對於程序來說是一種更加輕量級的多工並行,多執行緒之間一般都是共享所在程序的記憶體空間的。   Linux也不例外,雖然從核心的角度來看,執行緒體現為一種對程序的"克隆"(clon

Linux執行程式設計的高效開發經驗

背景 Linux 平臺上的多執行緒程式開發相對應其他平臺(比如 Windows)的多執行緒 API 有一些細微和隱晦的差別。不注意這些 Linux 上的一些開發陷阱,常常會