1. 程式人生 > >轉 epoll的一個demo(epoll+執行緒池)

轉 epoll的一個demo(epoll+執行緒池)

#include <pthread.h>

#include <string.h>

#include <stdlib.h>

#include <unistd.h>

#include <stdio.h>

#include <fcntl.h>

#include <arpa/inet.h>

#include <sys/epoll.h>

#include <sys/errno.h>

#include <sys/socket.h>

#define THREAD_MAX 20

#define LISTEN_MAX 20

#define SERVER_IP "127.0.0.1"

typedefstruct

{

char ip4[128];

intport;

int fd;

} LISTEN_INFO;

//伺服器引數

staticLISTEN_INFO s_listens[LISTEN_MAX];

//執行緒池引數

staticunsignedint s_thread_para[THREAD_MAX][8];//執行緒引數

staticpthread_ts_tid[THREAD_MAX];//執行緒ID

pthread_mutex_ts_mutex[THREAD_MAX];//執行緒鎖

//私有函式

static int init_thread_pool(void);

//初始化資料

static int init_listen4(char *ip4, int port, int max_link); //初始化監聽

//執行緒函式

void * test_server4(unsigned int thread_para[]);

//設定檔案描述符為NonBlock

bool setNonBlock(int fd)

{

intflags = fcntl(fd, F_GETFL, 0);

    flags|= O_NONBLOCK;

if(-1 == fcntl(fd, F_SETFL, flags))

    {

returnfalse;

    }

returntrue;

}

int main(int argc, char *argv[])//客戶端驅動

{

//臨時變數

int i,j, rc;

intsock_listen; //監聽套接字

intsock_cli; //客戶端連線

intlisten_index;

intepfd;

intnfds;

structepoll_event ev;

structepoll_event events[LISTEN_MAX];

socklen_taddrlen; //地址資訊長度

structsockaddr_in addr4; //IPv4地址結構

//執行緒池初始化

    rc =init_thread_pool();

if (0 != rc) exit(-1);

//初始化服務監聽

for(i = 0; i < LISTEN_MAX; i++)

    {

sprintf(s_listens[i].ip4, "%s",SERVER_IP);

       s_listens[i].port = 40000 + i;

//建立監聽

        rc= init_listen4(s_listens[i].ip4, s_listens[i].port, 64);

if (0 > rc)

        {

fprintf(stderr, "無法建立伺服器監聽於%s:%d\r\n", s_listens[i].ip4, s_listens[i].port);

exit(-1);

        }

else

        {

fprintf(stdout, "已建立伺服器監聽於%s:%d\r\n", s_listens[i].ip4, s_listens[i].port); 

        }

       s_listens[i].fd = rc;

    }

//設定集合

    epfd =epoll_create(8192);

for (i =0; i < LISTEN_MAX; i++)

    {

//加入epoll事件集合

       ev.events = EPOLLIN | EPOLLET;

       ev.data.u32 = i;//記錄listen陣列下標

if(epoll_ctl(epfd, EPOLL_CTL_ADD, s_listens[i].fd, &ev) < 0)

        {

fprintf(stderr, "向epoll集合新增套接字失敗(fd =%d)\r\n",rc);

exit(-1);

        }

    }      

//服務迴圈

for( ; ;)

    {

//等待epoll事件

       nfds = epoll_wait(epfd, events, LISTEN_MAX, -1);

//處理epoll事件

for(i = 0; i < nfds; i++)

        {

//接收客戶端連線

           listen_index = events[i].data.u32;

           sock_listen = s_listens[listen_index].fd;

           addrlen = sizeof(structsockaddr_in);

           bzero(&addr4, addrlen);

           sock_cli = accept(sock_listen, (structsockaddr *)&addr4, &addrlen);

if(0 > sock_cli)

           {

fprintf(stderr, "接收客戶端連線失敗\n");

continue;

           }

else

           {

char*myIP = inet_ntoa(addr4.sin_addr);

printf("accept aconnection from %s...\n", myIP);

           }

           setNonBlock(sock_cli);

//查詢空閒執行緒對

for(j = 0; j < THREAD_MAX; j++)

           {

if (0 == s_thread_para[j][0]) break;

           }

if (j>= THREAD_MAX)

           {

fprintf(stderr, "執行緒池已滿, 連線將被放棄\r\n");

               shutdown(sock_cli, SHUT_RDWR);

               close(sock_cli);

continue;

           }

//複製有關引數

           s_thread_para[j][0] = 1;//設定活動標誌為"活動"

           s_thread_para[j][1] = sock_cli;//客戶端連線

           s_thread_para[j][2] = listen_index;//服務索引

//執行緒解鎖

           pthread_mutex_unlock(s_mutex + j);

        }//end of for(i;;)

    }//end of for(;;)

exit(0);

}

static int init_thread_pool(void)

{

int i,rc;

//初始化執行緒池引數

for(i = 0; i < THREAD_MAX; i++)

    {

       s_thread_para[i][0] = 0;//設定執行緒佔用標誌為"空閒"

       s_thread_para[i][7] = i;//執行緒池索引

       pthread_mutex_lock(s_mutex + i);// 這個地方為什麼要加鎖?不加鎖建立監聽有時會不成功

    }

//建立執行緒池

for(i = 0; i < THREAD_MAX; i++)

    {

        rc= pthread_create(s_tid + i, 0, (void*(*)(void*))test_server4, (void*)(s_thread_para[i]));

if (0 != rc)

        {

fprintf(stderr, "執行緒建立失敗\n");

return(-1);

        }

    }

//成功返回

return(0);

}

static int init_listen4(char *ip4, int port, int max_link)

{

//臨時變數

intsock_listen4;

structsockaddr_in addr4;

unsignedintoptval;

structlinger optval1;

//初始化資料結構

   bzero(&addr4, sizeof(addr4));

//inet_pton將點分十進位制IP轉換為整數

   inet_pton(AF_INET, ip4, &(addr4.sin_addr));

    addr4.sin_family= AF_INET;

//htons將無符號short從主機位元組序(x86:Big-Endian)轉換為網路位元組序

   addr4.sin_port = htons(port);

//建立流型別的SOCKET

   sock_listen4 = socket(AF_INET, SOCK_STREAM, 0);

if (0 > sock_listen4)

    {

fprintf(stderr, "建立socket異常,sock_listen4:%d\n", sock_listen4);

       perror("建立socket異常");

return(-1);

    }

//設定SO_REUSEADDR選項(伺服器快速重起)

    optval= 0x1;

   setsockopt(sock_listen4, SOL_SOCKET, SO_REUSEADDR, &optval, 4);

//設定SO_LINGER選項(防範CLOSE_WAIT掛住所有套接字)

   optval1.l_onoff = 1;

   optval1.l_linger = 60;

   setsockopt(sock_listen4, SOL_SOCKET, SO_LINGER, &optval1, sizeof(structlinger));

if (0 > bind(sock_listen4, (structsockaddr *)&addr4, sizeof(addr4)))

    {

fprintf(stderr, "bind socket異常,sock_listen4:%d\n", sock_listen4);

       perror("bind socket異常");

       close(sock_listen4);

return(-1);

    }

if (0 > listen(sock_listen4, max_link))

    {

fprintf(stderr, "listen socket異常,sock_listen4:%d\n", sock_listen4);

       perror("listen socket異常");

       close(sock_listen4);

return(-1);

    }

return(sock_listen4);

}

void * test_server4(unsigned int thread_para[])

{

//臨時變數

int sock_cli;//客戶端連線

intpool_index; //執行緒池索引

intlisten_index; //監聽索引

charbuff[32768]; //傳輸緩衝區

int i,j, len;

char *p;

//執行緒脫離建立者

   pthread_detach(pthread_self());

   pool_index = thread_para[7];

wait_unlock:

    pthread_mutex_lock(s_mutex+ pool_index);//等待執行緒解鎖

//執行緒變數內容複製

   sock_cli = thread_para[1];//客戶端連線

   listen_index = thread_para[2];//監聽索引

//接收請求

    len =recv(sock_cli, buff, sizeof(buff),MSG_NOSIGNAL);

printf("%s\n",buff);

//構造響應

    p =buff;

//HTTP頭

    p += sprintf(p, "HTTP/1.1 200OK\r\n");

    p += sprintf(p, "Content-Type:text/html\r\n");

    p += sprintf(p, "Connection:closed\r\n\r\n");

//頁面

    p += sprintf(p, "<html>\r\n<head>\r\n");

    p += sprintf(p, "<metacontent=\"text/html; charset=UTF-8\"http-equiv=\"Content-Type\">\r\n");

    p += sprintf(p, "</head>\r\n");

    p += sprintf(p, "<bodystyle=\"background-color: rgb(229, 229, 229);\">\r\n");

    p += sprintf(p, "<center>\r\n");

    p += sprintf(p, "<H3>連線狀態</H3>\r\n");

    p += sprintf(p, "<p>伺服器地址 %s:%d</p>\r\n",s_listens[listen_index].ip4, s_listens[listen_index].port);

    j = 0;

for(i = 0; i < THREAD_MAX; i++)

    {

if (0 != s_thread_para[i][0]) j++;

    }

    p += sprintf(p, "<H3>執行緒池狀態</H3>\r\n");

    p += sprintf(p, "<p>執行緒池總數 %d 活動執行緒總數%d</p>\r\n", THREAD_MAX, j);

    p += sprintf(p, "</center></body></html>\r\n");

    len = p- buff;

//傳送響應

   send(sock_cli, buff, len, MSG_NOSIGNAL);

memset(buff, 0, 32768);

//釋放連線

   shutdown(sock_cli, SHUT_RDWR);

   close(sock_cli);

//執行緒任務結束

   thread_para[0] = 0;//設定執行緒佔用標誌為"空閒"

gotowait_unlock;

   pthread_exit(NULL);

}


相關推薦

執行之旅ThreadPool 執行

一、什麼是ThreadPool 執行緒池(原始碼)       1.執行緒池顧名思義,有我們的系統建立一個容器裝載著我們的執行緒,由CLR控制的所有AppDomain共享。執行緒池可用於執行任務、傳送工作項、處理非同步 I/O、代表其他執行緒等待以及處理計時器。所以使用執行緒池不

epoll一個demoepoll+執行

#include <pthread.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> #includ

Java之TCP傳輸小Demo執行:服務端

import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; public c

Java之TCP傳輸小Demo執行:客戶端

import java.io.*; import java.net.Socket; public class TCPClientDemo { public static void main(String[] args) throws IOException {

Java之UDP傳輸聊天程式小Demo執行

ChatDemo.java import java.net.DatagramSocket; public class ChatDemo { public static void main(String[] args) { try {

Java之UDP傳輸小Demo執行即傳送端和接收端為兩個獨立程序:傳送端

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import j

Linux + C + Epoll實現高併發伺服器(執行 + 資料庫連線)

一, 背景        先說下我要實現的功能,server端一直在linux平臺下面跑,當客戶端有請求過來的時候server端接受到請求,拿到客戶端的資料,根據拿到的資料做出相應的處理,得到處理的結果直接把結果資料傳送給客戶端。這樣一個連線的請求結束,我的不

2.3四種執行連線的配置和使用和自定義執行

四種執行緒連線池的配置和使用 最終呼叫類和方法 {引數有 核心執行緒數目,最大執行緒數目,存活時間(當前執行緒執行完這個任務之後,等待下一個任務到來的最長等待時間。如果在這個時間內沒有新的任務來到,那當前執行緒就會退出),時間單位,等待佇列(用於存放待執行的任務)} public

【小家java】Java中的執行,你真的用對了嗎?教你用正確的姿勢使用執行

相關閱讀 【小家java】java5新特性(簡述十大新特性) 重要一躍 【小家java】java6新特性(簡述十大新特性) 雞肋升級 【小家java】java7新特性(簡述八大新特性) 不溫不火 【小家java】java8新特性(簡述十大新特性) 飽受讚譽 【小家java】java9

利用threading模組和queue模組在python3直譯器上建立一個簡單的python執行

python直譯器沒提供執行緒池模組,故在python3上自定義python執行緒池簡單版本,程式碼如下 #用threading queue 做執行緒池 import queueimport threadingclass ThreadPool(): def __init__(self,arg):#建

實戰Java高併發程式設計3.2 執行

1.Executor jdk提供了一套Executor框架,本質上是一個執行緒池。 newFixedThreadPool()方法:該方法返回一個固定數量的執行緒池。該執行緒池中的執行緒數量始終不變,當有一個新任務提交時,執行緒池中若有空閒執行緒,則立即執行,若沒有,則任務會暫存在一個任

1.JVM之對Vector執行安全的測試相對執行安全

import java.util.Vector; public class vector { private static Vector<Integer> vector=new Vector<>(); public static void main(Stri

《Java多執行程式設計實戰》—— 第9章 Thread Pool執行模式

一個系統中的執行緒相對於其所要處理的任務而言,是一種非常有限的資源。執行緒不僅在執行任務時需要消耗CPU時間和記憶體等資源,執行緒物件(Thread例項)本身以及執行緒所需的呼叫棧(Call Stack)也佔用記憶體,並且Java中建立一個執行緒往往意味著JVM會建立相應的依賴於宿主機作業系

Java-時間等待執行睡眠

  Java-時間等待 1 TimeUnit.DAYS.sleep(1);//天 2 TimeUnit.HOURS.sleep(1);//小時 3 TimeUnit.MINUTES.sleep(1);//分 4 TimeUnit.SECONDS.sleep(1);//秒

24執行3

1 單例設計模式 保證類在記憶體中只有一個物件。有三種寫法,下面分別介紹 (1)餓漢式 為什麼叫它餓漢式寫法呢,因為它在建立類的時候,不管三七二十一就直接建立了s物件,也不管s會不會被使用,相反,還有一種寫法叫懶漢式寫法。 (2)懶漢式(單例延遲載入模式) 多執行

23執行2

1 休眠執行緒 2 守護執行緒 這個概念需要解釋一下,以象棋為例,非守護執行緒相當於帥,守護執行緒相當於車馬相士,當帥都沒了,車馬相士就失去了它的意義了,就是非守護執行緒停止了,守護執行緒也就gg了。 結果就是aaaa不會輸出50次了。 3 加入執行緒(插隊執行緒)

22執行1

1 多執行緒 2 多執行緒的原理 3 並行和併發的區別 4 java是多執行緒嗎 5 多執行緒的實現方式一(繼承Thread) 6 多執行緒的實現方式二(實現Runnable) 7 兩種方式的區別 8 匿名內部類實現執行

Java之多執行安全屌絲版,兩大解決思路,要麼不去競爭開闢執行副本、要麼有順序的競爭資源用鎖規定執行秩序

0、多執行緒安全,如果多個執行緒操作一個變數,每次都能達到預期的結果,那麼說明當前這個類起碼是執行緒安全的,我這白話的,可能有點噁心。   1、看看牛人是怎麼說的,為什麼多執行緒併發是不安全的? 在作業系統中,執行緒是不再擁有資源的,程序是擁有資源的。而執行緒是由程序建立的

python只使用Queue和Thread自己實現一個最簡單的執行

        我的思路就是就是寫一個TifCutting類繼承自Thread,這個類裡有個屬性Queue;有一個addTask新增任務的方法,這個方法是把需要執行的函式放到Queue裡;因為繼承自Thread類,一定有一個重寫的run方法,這個方法是從自己的Queue屬性裡

Python 實用程式設計技巧執行

一、GIL(global_interpreter_lock) 1.概念: Python 一開始為了簡單,在多執行緒程式設計的時候會在我們的直譯器上加一個非常大的鎖,也就是允許我們一次只有一個執行緒執行在一個CPU上,gil 就能實現在同一時刻只有一個執行緒在CPU上執行位