1. 程式人生 > >EPOLL及訊息佇列實現

EPOLL及訊息佇列實現

#include "smtpd_mock.h"

char* strsub (char *instr, unsigned start, unsigned end)
{
 unsigned n = end - start;
 char * outstr = (char *)malloc(n+1);
 //bzero(outstr,n+1);
 strncpy (outstr, instr + start, n);
 outstr[n] = 0;
 return outstr;
}

int setnonblocking(int sockfd)
{
    if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1)
    {
        return -1;
    }
    return 0;
}

void smtp_echo(void* data)
{
 int socket = *(int*)data;
 char ebuf[128],buffer[BUFFER_SIZE];
 int length = 0, z;
 regex_t reg;
  regmatch_t pm[10];
  const size_t nmatch = 10;
 const char * split = "/r/n";
 char * pline, * cmd;
 
 z = regcomp (&reg, smtp_cmd_format, REG_EXTENDED);
 if (z != 0){
  regerror (z,&reg, ebuf, sizeof (ebuf));
  fprintf (stderr, "%s: regcomp()/n", ebuf);
  return;
     }

 { 
  while (1) { 
      bzero(buffer,BUFFER_SIZE);
       length = recv(socket,buffer,BUFFER_SIZE,0);
      if (length == -1) { 
        if(errno == EAGAIN){
            break;
    }
         syslog(LOG_ERR,"recv - %m"); 
         break; 
      } 
      syslog(LOG_DEBUG,"%s",buffer);

   pline = strtok (buffer,split); 
   while(pline!=NULL) {
    syslog(LOG_DEBUG,"%s/n",pline);
    if (0==(strcasecmp(pline, "."))){
     smtp_cmd("HELO");
                       continue;
                  }
    z = regexec (&reg, pline, nmatch, pm, 0);
    if (z == REG_NOMATCH)
    {
     // do nothing;
    }
    else if (z != 0)
    {
     regerror (z,&reg, ebuf, sizeof (ebuf));
     fprintf (stderr, "%s: regexec('%s')/n", ebuf, pline);
     return ;
    }

    if(pm[1].rm_so != -1)
    {
     cmd = strsub (pline, pm[1].rm_so, pm[1].rm_eo);
     syslog(LOG_NOTICE,"cmd => %s/n", cmd);
     if(pm[2].rm_so != -1)
     {
      syslog(LOG_NOTICE,"other content => %s/n", strsub (pline, pm[2].rm_so, pm[2].rm_eo));
     }
     
     smtp_cmd(cmd,socket);
    }
    pline = strtok(NULL,split);
   }
   
   if(length < BUFFER_SIZE)
                         break;
  }

 } 
 
 regfree (&reg);
 return;
}

void smtp_cmd(char * cmd,int socket)

 char buffer[BUFFER_SIZE];
 bzero(buffer, BUFFER_SIZE);

 if(0 == (strcasecmp(cmd,"HELO")))
 {
     strcpy(buffer,"250 Regards from CharlesCui/r/n");
  send(socket,buffer,strlen(buffer),0);
 } else if(0==(strcasecmp(cmd,"QUIT")))
 {
     strcpy(buffer,"221 QUIT OK/r/n");
  send(socket,buffer,strlen(buffer),0);
     close(socket);
  epoll_ctl(kdpfd, EPOLL_CTL_DEL, socket, &ev);
 } else if(0==(strcasecmp(cmd,"NOOP")))
 {
     strcpy(buffer,"250 NOOP/r/n");
  send(socket,buffer,strlen(buffer),0);
 } else if(0==(strcasecmp(cmd,"DATA")))
 {
     strcpy(buffer,"354 End data with <CR><LF>.<CR><LF>/r/n");
  send(socket,buffer,strlen(buffer),0);
 } else if(0==(strcasecmp(cmd,"EHLO")))
 {
     strcpy(buffer,"334 250-mail/r/n250-PIPELINING/r/n250-AUTH LOGIN PLAIN/r/n250-AUTH=LOGIN PLAIN/r/n250 8BITMI/r/n");
  send(socket,buffer,strlen(buffer),0);
 } else if(0==(strcasecmp(cmd,"AUTH")))
 {
     strcpy(buffer,"334 dXNlcm5hbWU6/r/n");
  send(socket,buffer,strlen(buffer),0);
 } else if(0==(strcasecmp(cmd, "MAIL")))
 {
     strcpy(buffer,"250 Mail Ok/r/n");
  send(socket,buffer,strlen(buffer),0);
 } else if(0==(strcasecmp(cmd, "RCPT")))
 {
     strcpy(buffer,"250 Rcpt Ok/r/n");
  send(socket,buffer,strlen(buffer),0);
 } else if(0==(strcasecmp(cmd,"220")))
 {
     strcpy(buffer,"220 Welcome to CharlesCui's smtpd mock server./r/n");
  send(socket,buffer,strlen(buffer),0);
 } else
 {
     strcpy(buffer,"");
  send(socket,buffer,strlen(buffer),0);
  syslog(LOG_NOTICE,"Error smtp command.");
 }
}

int init_smtp(int port)
{
    struct sockaddr_in *server_addr;
    server_addr = malloc(sizeof(struct sockaddr_in));
    server_addr->sin_family = AF_INET;
    server_addr->sin_addr.s_addr = htons(INADDR_ANY);
    server_addr->sin_port = htons(port);
   
    int server_socket = socket(AF_INET,SOCK_STREAM,0);
 syslog(LOG_NOTICE,"init_smtp:server_socket => %d/n",server_socket);
 setnonblocking(server_socket);
    if( server_socket < 0)
    {
        syslog(LOG_ERR,"Create Socket Failed! - %m/n");
        exit(1);
    }
   
    if( bind(server_socket,(struct sockaddr*)server_addr,sizeof(struct sockaddr_in)))
    {
        syslog(LOG_ERR,"Server Bind Port : %d Failed! - %m/n", port);
        exit(1);
    }
 
    if ( listen(server_socket, g_listen_size) )
    {
        syslog(LOG_ERR,"Server Listen Failed! - %m/n");
        exit(1);
    }

 
    struct rlimit rt;
 rt.rlim_max = rt.rlim_cur = g_epoll_size;
    if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
    {
        syslog(LOG_ERR,"setrlimit - %m");
        exit(1);
    }
    else
    {
        syslog(LOG_NOTICE,"設定系統資源引數成功!/n");
    }

 return server_socket;
}
void block_queue(void * param)
{
 /*
 姑娘們,排好對,等客了!
 老鴇吩咐要做什麼都知道了嗎?(func為回撥函式)
 */
 void(* func)(void* );
 int fd;
 block_queue_node_t *head_node;
 
 //param是全域性變數bqp
 block_queue_param_t* bque = (block_queue_param_t*)param;
 func = bque->func;
 
 for(;;)
 {
  pthread_mutex_lock(&bque->mutex);
  pthread_cond_wait(&bque->cond,&bque->mutex);
  /*
  來客啦!
  */
  if(list_empty(&head))
  {
   //哪個小二瞎喊,命名一個客人都沒來!
   pthread_mutex_unlock(&bque->mutex);
   continue;
  }else
  {
   /*
   大爺,跟我走吧,我那兒寬敞
   從連結串列頭部取出一個節點
   */
   head_node = list_entry(head.next,block_queue_node_t,list);
   fd = head_node->fd;
   //大爺,你是我的了!
   //同時刪除該節點
   list_del(&head_node->list);
   /**/
   free(head_node);
   counter--;
  }
  
  pthread_mutex_unlock(&bque->mutex);

  /*幹*/
  func(&fd);
 }
}

int insert_queue(block_queue_param_t *bque,int fd)
{
 //生成臨時節點,用來儲存fd
 block_queue_node_t *b = (block_queue_node_t *)malloc(sizeof(block_queue_node_t));
 b->fd = fd;
 
 pthread_mutex_lock(&bque->mutex);
 
 if(counter > g_listen_size){
  //當客人數量超過小姐接待能力的時候
  //就放棄接待該客人,並且返回1.
  //青樓是殘酷滴,一個蘿蔔一個坑
  return 1;
 }else{
  counter++;
 }
 /*
 將新增的節點插入到尾部,
 相對應的,block_queue迴圈體中取節點時,
 是從連結串列頭取到的.
 */
 list_add_tail(&b->list,&head);
 /*
 客人到!
 姐妹們快搶客啊!(核心用broadcast通知各阻塞的執行緒)
 */
 pthread_cond_broadcast(&bque->cond);
 pthread_mutex_unlock(&bque->mutex);
 
 return 0;
}

int init_threads()
{
 size_t i=0;
 //這是今天的流水賬,
 //客人們來了都會在這裡(head連結串列)登記的.
 //都知道今天各位姑娘要做什麼吧(smtp_echo).
 //為全域性變數bqp設定屬性
 bqp.func = (void*)smtp_echo;
 /*
 不許搶客人!(互斥mutex)
 說了多少次了,不管男女老幼長短粗細,
 只有客人想不到,沒有我們做不到!
 別隻盯著帥哥.
 */
 pthread_cond_init(&bqp.cond,NULL);
 pthread_mutex_init(&bqp.mutex,NULL);

 /*
 姑娘們起床了!
 初始化各個執行緒
 */
 for( i = 0; i < g_th_count; ++i)
 {
  pthread_t child_thread;
     pthread_attr_t child_thread_attr;
     pthread_attr_init(&child_thread_attr);
     pthread_attr_setdetachstate(&child_thread_attr,PTHREAD_CREATE_DETACHED);
  /*
  養你們是要幹活(block_queue)的,
  沒活的時候可以休息著(pthread_cond_wait)
  活來了(pthread_cond_signal)就麻利點去接客(head連結串列非空)
  */
  if( pthread_create(&child_thread,&child_thread_attr,(void *)block_queue, (void *)&bqp) < 0 )
     {
   syslog(LOG_ERR,"pthread_create Failed : %s - %m/n",strerror(errno));
   return 1;
  }
  else
  {
   syslog(LOG_NOTICE,"pthread_create Success : %d/n",(int)child_thread);
  }
 }

}

int handler(void* fd)
{
 syslog(LOG_NOTICE,"handler:fd => %d/n",*(int *)(fd));
 //向全域性變數bqp中插入一個節點
 //姑娘們聽好了,
 //大爺都在排隊呢,
 //一個個麻利點,伺候起來了!
 return insert_queue(&bqp,*(int *)fd);
}

void init_daemon(void)
{
 int pid;
 int i;
 if(pid=fork())
  exit(0);//是父程序,結束父程序
 else if(pid< 0)
  exit(1);//fork失敗,退出
 //是第一子程序,後臺繼續執行
 setsid();//第一子程序成為新的會話組長和程序組長
 //並與控制終端分離
 if(pid=fork())
  exit(0);//是第一子程序,結束第一子程序
 else if(pid< 0)
  exit(1);//fork失敗,退出
 //是第二子程序,繼續
 //第二子程序不再是會話組長

 for(i=0;i< NOFILE;++i)//關閉開啟的檔案描述符
  close(i);
 chdir("/tmp");//改變工作目錄到/tmp
 umask(0);//重設檔案建立掩模
 return;
}

int main(int argc, char **argv)

 char ch;
 int d = 0;

 //處理argv
 while( ( ch = getopt( argc, argv, "p:t:l:e:d?" ) ) != EOF )
 {
  switch(ch)
  {
   case 'p':
    printf("SMTPD_PORT =>%s ", optarg);
    g_port = atoi(optarg);
    break;
   case 't':
    printf("THREADS_COUNT => %s ", optarg);
    g_th_count = atoi(optarg);
    break;
   case 'l':
    printf("LENGTH_OF_LISTEN_QUEUE => %s. ",optarg);
    g_listen_size = atol(optarg);
    break;
   case 'e':
    printf("MAX_EPOLL_SIZE => %s. ",optarg);
    g_epoll_size = atol(optarg);
    break;
   case 'd':
    printf("RUN AS DAEMON. ");
    d = 1;
   case '?':
    printf("Useage: -p [SMTPD_PORT|8025] -t [THREADS_COUNT|100] -l [LENGTH_OF_LISTEN_QUEUE|1024] -e [MAX_EPOLL_SIZE|1000] -d (RUN AS DAEMON.)/n");
    exit(1);
   default:
    printf("Not support option :%c/n",ch);
    exit(2);
  }
 }

 if(d == 1)
  init_daemon();

 //一天的流水賬要記錄下來啊
 //初始化 syslog
 char *ident = "Smtp Mock";
 int logopt = LOG_PID | LOG_CONS;
 int facility = LOG_USER;
 openlog(ident, logopt, facility);
  setlogmask(LOG_UPTO(LOG_ERR));
   
 syslog(LOG_INFO,"syslog inited.");

 //初始化連結串列
 INIT_LIST_HEAD(&head);

 //生成smtp套接字
 //本店開張了,歡迎訪問
 int server_socket = init_smtp(g_port);
 int n;
 
 if(init_threads() == 0)
  syslog(LOG_NOTICE,"Success full init_threads.");
 
 /*
 下面要把本店加入全球領先的企業管理系統中,
 該系統節省人力資源,
 不需要服務員傻等在門口,
 而是客人到了就會通知服務員出來迎賓.
 */
 kdpfd = epoll_create(g_epoll_size);
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = server_socket;
    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, server_socket, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%d < 0",
                server_socket);
        return -1;
    }
 //老鴇(主執行緒)負責拉客,姑娘(子執行緒)負責接客 
    for(;;) {
  struct sockaddr_in local;
  socklen_t length = sizeof(local);
  int client;

  //epoll_wait實現了阻塞,而不是busy loop
        nfds = epoll_wait(kdpfd, events, g_epoll_size, -1);

        for(n = 0; n < nfds; ++n) {
  //判斷套接字
  //看是熟客還是生客
            if(events[n].data.fd == server_socket) {
  //新新新新,新來的吧
  //你是新新新新新來的吧
                client = accept(server_socket, (struct sockaddr *) &local,&length);
  //是生客就發一個新的id卡(client)
                if(client < 0){
                    syslog(LOG_ERR,"accept - %m");
                    continue;
                }
                setnonblocking(client);
    //先跟大爺打聲招呼,顯得我們姑娘主動些
    smtp_cmd("220",client);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = client;
  /*
  再發張VIP卡,
  把大爺加入VIP客戶列表,
  享受天上人間的服務
  */
                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                    fprintf(stderr, "epoll set insertion error: fd=%d < 0",
                            client);
                    return -1;
                }
            }
            else
  /*
  這位大爺肯定來過好幾次了,
  否則怎麼連後門都知道.
  */
  /*
  後屋一排姑娘,大爺您慢慢挑
  老鴇就不奉陪了,姑娘們伺候著!
  */
                if(handler((void *)&events[n].data.fd) != 0)
                 syslog(LOG_ERR,"handler ret != 0 - %m");
        }
    }
 //打擊色..情產業,
 //被迫歇業了
    close(server_socket);
    return 0;
}