1. 程式人生 > >IOCP簡易實現(客戶端與客戶端之間可以聊天)

IOCP簡易實現(客戶端與客戶端之間可以聊天)

#include "stdafx.h" #include <iostream> #include "List.h" #define  RECV 1 #define  SEND 2 #define  ACCEPT 0 #define  DISCONNECT 3 #define  FREE 4 const int DefaultPort = 8000; const int ALLUSERS = 1000; using namespace std; List<OVERLAPEDEx*> per_IOpool; List<OVERLAPEDEx*> per_IOusing; List<PERSOCKET*> per_socketpool; List<PERSOCKET*> per_socketusing; PERSOCKET* Userssocket; int onlineusers = 0,  acceptedflags = 1,  acceptwillflags = 1,  acceptcheck = 1; GUID GuidAcceptEx =  WSAID_ACCEPTEX; LPFN_ACCEPTEX CSY_AcceptEx = NULL; GUID GuiddisconnectEx = WSAID_DISCONNECTEX; LPFN_DISCONNECTEX CSY_Disconnect = NULL; HANDLE mutex = CreateMutex(NULL,FALSE,NULL); #pragma comment(lib, "Ws2_32.lib") #pragma comment(lib, "Kernel32.lib") #pragma comment(lib,"Mswsock.lib") void AcceptIO(OVERLAPEDEx*); void RecvIO(OVERLAPEDEx*); void SendIO(OVERLAPEDEx*); void DisconnectIO(OVERLAPEDEx*); unsigned int __stdcall srvsend(void *complt) {  char chatting[200];  int a;  while (1)  {   getchar();   if ((per_socketusing.size)>0)   {    if (onlineusers>0)    {     WaitForSingleObject(mutex,INFINITE);     cout<<endl<<"請輸入內容:";     ReleaseMutex(mutex);     gets(chatting+1);    }    WaitForSingleObject(mutex,INFINITE);    chatting[0] = 'f';    node<PERSOCKET*>* temp = temp = per_socketusing.head;    for (int i=0;i<per_socketusing.size;i++)    {     if (send(Userssocket[temp->data->UserID].socket,chatting,200,0)<0)     {      cout<<"No."<<a<<" can't connect server"<<endl;     }     else     {      cout<<"回執已傳送。"<<endl;     }     temp = temp->next;    }    ReleaseMutex(mutex);   }   else   {    WaitForSingleObject(mutex,INFINITE);    cout<<"There is no client link~"<<endl;    ReleaseMutex(mutex);   }  } } unsigned int __stdcall srvrecv(void *complt) {  HANDLE completion = (HANDLE)complt;  DWORD transbytes;  DWORD recvbytes;  DWORD Recvbytes;  DWORD flags = 0;  DWORD Flags = 0;  LPOVERLAPPED theoverlaped = NULL;  OVERLAPEDEx *peroverlapedex = NULL;  PERSOCKET *thesocket = NULL;  int gqc = 1;  while (1)  {   gqc = GetQueuedCompletionStatus(completion,    &transbytes,    (PULONG_PTR)&thesocket,    (LPOVERLAPPED*)&theoverlaped,    INFINITE);   peroverlapedex = (OVERLAPEDEx*)theoverlaped;   if (gqc==0&&transbytes==0)   {    DWORD a = GetLastError();    WaitForSingleObject(mutex,INFINITE);    onlineusers--;    cout<<"No."<<peroverlapedex->UserID<<"client exit,error="<<a<<endl;    per_socketpool.add(&Userssocket[peroverlapedex->UserID]);    per_socketusing.remove(&Userssocket[peroverlapedex->UserID]);    cout<<"回收Socket..."<<endl;    cout<<"剩餘Socket:"<<per_socketpool.size;    cout<<" 正在使用Socket:"<<per_socketusing.size<<endl;    peroverlapedex->IOType = FREE;    cout<<"投遞Disconnect..."<<endl;    cout<<"剩餘IO:"<<per_IOpool.size;    cout<<" 正在使用IO:"<<per_IOusing.size<<endl;    CSY_Disconnect(thesocket->socket,     &peroverlapedex->overlaped,     TF_REUSE_SOCKET,     0);    node<PERSOCKET*>* temp = temp = per_socketusing.head;    for (int i=0;i<per_socketusing.size;i++)    {     ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));     per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;     sprintf(per_IOpool.head->data->buffer,"f%d號客戶端下線了,當前有%d人線上",thesocket->UserID,per_socketusing.size);     per_IOpool.head->data->databuff.len = 32;     per_IOpool.head->data->IOType = SEND;     per_IOpool.head->data->UserID = per_socketusing.tail->data->UserID;     per_IOpool.head->data->socket = (HANDLE)per_socketusing.tail->data->socket;     WSASend(temp->data->socket,      &per_IOpool.head->data->databuff,      1,      &Recvbytes,      flags,      &per_IOpool.head->data->overlaped,      0);     per_IOusing.add(per_IOpool.head->data);     per_IOpool.removehead();     cout<<"投遞WSASend..."<<endl;     cout<<"剩餘IO:"<<per_IOpool.size;     cout<<" 正在使用IO:"<<per_IOusing.size<<endl;     temp = temp->next;    }    ReleaseMutex(mutex);    continue;   }   if (peroverlapedex->IOType==ACCEPT)   {    WaitForSingleObject(mutex,INFINITE);    per_socketusing.add(per_socketpool.head->data);    per_socketpool.removehead();      cout<<"獲取Socket..."<<endl;    cout<<"剩餘Socket:"<<per_socketpool.size;    cout<<" 正在使用Socket:"<<per_socketusing.size<<endl;    onlineusers++;    acceptedflags = 0;    CreateIoCompletionPort((HANDLE)per_socketusing.tail->data->socket,     completion,     (DWORD)&per_socketusing.tail->data->socket,     0);    char ip[4] = {0};    for (int i=2026,n=0;i<2030;++i,++n)    {     ip[n] = peroverlapedex->buffer[i];    }    int IP[4];    for (int i=0;i<4;++i)    {     IP[i] = ((int)ip[i]+256)%256;    }    cout<<"IP:"<<IP[0]<<"."<<IP[1]<<"."<<IP[2]<<"."<<IP[3]<<":"<<peroverlapedex->databuff.buf<<endl;    ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));    per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;    sprintf(per_IOpool.head->data->buffer,"f登入成功,當前有%d人線上",per_socketusing.size);    per_IOpool.head->data->databuff.len = 25;    per_IOpool.head->data->IOType = SEND;    per_IOpool.head->data->UserID = per_socketusing.tail->data->UserID;    per_IOpool.head->data->socket = (HANDLE)per_socketusing.tail->data->socket;    WSASend(per_socketusing.tail->data->socket,     &per_IOpool.head->data->databuff,     1,     &Recvbytes,     flags,     &per_IOpool.head->data->overlaped,     0);    per_IOusing.add(per_IOpool.head->data);    per_IOpool.removehead();    cout<<"投遞WSASend..."<<endl;    cout<<"剩餘IO:"<<per_IOpool.size;    cout<<" 正在使用IO:"<<per_IOusing.size<<endl;    node<PERSOCKET*>* temp = temp = per_socketusing.head;    for (int i=0;i<per_socketusing.size-1;i++)    {     ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));     per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;     sprintf(per_IOpool.head->data->buffer,"f%d號客戶端上線了,當前有%d人線上",per_socketusing.tail->data->UserID,per_socketusing.size);     per_IOpool.head->data->databuff.len = 32;     per_IOpool.head->data->IOType = SEND;     per_IOpool.head->data->UserID = per_socketusing.tail->data->UserID;     per_IOpool.head->data->socket = (HANDLE)per_socketusing.tail->data->socket;     WSASend(temp->data->socket,      &per_IOpool.head->data->databuff,      1,      &Recvbytes,      flags,      &per_IOpool.head->data->overlaped,      0);     per_IOusing.add(per_IOpool.head->data);     per_IOpool.removehead();     cout<<"投遞WSASend..."<<endl;     cout<<"剩餘IO:"<<per_IOpool.size;     cout<<" 正在使用IO:"<<per_IOusing.size<<endl;     temp = temp->next;    }    ZeroMemory(&(peroverlapedex->overlaped), sizeof(OVERLAPPED));    peroverlapedex->databuff.buf = peroverlapedex->buffer;    peroverlapedex->databuff.len = 2048;    peroverlapedex->IOType = RECV;    peroverlapedex->UserID = per_socketusing.tail->data->UserID;    peroverlapedex->socket = (HANDLE)per_socketusing.tail->data->socket;    WSARecv(per_socketusing.tail->data->socket,     &peroverlapedex->databuff,     1,     &recvbytes,     &Flags,     &peroverlapedex->overlaped,     0);    ReleaseMutex(mutex);   }   else   {    if (peroverlapedex->IOType==RECV)    {     WaitForSingleObject(mutex,INFINITE);     cout<<"客戶端"<<peroverlapedex->UserID<<"號:"<<peroverlapedex->databuff.buf+4<<endl;     ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));     per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;     strcpy(per_IOpool.head->data->databuff.buf+4,peroverlapedex->databuff.buf+4);     memcpy(per_IOpool.head->data->databuff.buf,&peroverlapedex->UserID,4);     per_IOpool.head->data->databuff.len = strlen(peroverlapedex->databuff.buf+4)+5;     per_IOpool.head->data->IOType = SEND;     per_IOpool.head->data->UserID = *(int*)(peroverlapedex->buffer);     per_IOpool.head->data->socket = (HANDLE)Userssocket[*(int*)peroverlapedex->buffer].socket;     WSASend(Userssocket[*(int*)peroverlapedex->buffer].socket,      &per_IOpool.head->data->databuff,      1,      &Recvbytes,      flags,      &per_IOpool.head->data->overlaped,      0);     per_IOusing.add(per_IOpool.head->data);     per_IOpool.removehead();     cout<<"投遞WSASend..."<<endl;     cout<<"剩餘IO:"<<per_IOpool.size;     cout<<" 正在使用IO:"<<per_IOusing.size<<endl;     ZeroMemory(peroverlapedex, sizeof(OVERLAPPED));     peroverlapedex->databuff.buf = peroverlapedex->buffer;     peroverlapedex->databuff.len = 2048;     peroverlapedex->IOType = RECV;     WSARecv(Userssocket[peroverlapedex->UserID].socket,      &peroverlapedex->databuff,      1,      &recvbytes,      &flags,      &peroverlapedex->overlaped,      0);     cout<<"投遞WSARecv..."<<endl;     cout<<"剩餘IO:"<<per_IOpool.size;     cout<<" 正在使用IO:"<<per_IOusing.size<<endl;     ReleaseMutex(mutex);    }    else    {     if (peroverlapedex->IOType==SEND)     {      WaitForSingleObject(mutex,INFINITE);      per_IOpool.add(peroverlapedex);      per_IOusing.remove(peroverlapedex);      cout<<"回收WSASend..."<<endl;      cout<<"剩餘IO:"<<per_IOpool.size;      cout<<" 正在使用IO:"<<per_IOusing.size<<endl;      ReleaseMutex(mutex);     }     else     {      if (peroverlapedex->IOType==FREE)      {       WaitForSingleObject(mutex,INFINITE);       per_IOpool.add(peroverlapedex);       per_IOusing.remove(peroverlapedex);       cout<<"回收WSARecv..."<<endl;       cout<<"剩餘IO:"<<per_IOpool.size;       cout<<" 正在使用IO:"<<per_IOusing.size<<endl;       ReleaseMutex(mutex);      }      else      {       if (peroverlapedex->IOType==DISCONNECT)       {        WaitForSingleObject(mutex,INFINITE);        per_IOpool.add(peroverlapedex);        per_IOusing.remove(peroverlapedex);        cout<<"回收Disconnect..."<<endl;        cout<<"剩餘IO:"<<per_IOpool.size;        cout<<" 正在使用IO:"<<per_IOusing.size<<endl;        ReleaseMutex(mutex);       }      }     }    }   }  }  return 0; } int _tmain(int argc, _TCHAR* argv[]) {  HANDLE servesend;  WORD winsock =  MAKEWORD(2,2);  WSADATA wsaData;  DWORD err = WSAStartup(winsock,&wsaData);  if (err!=0)  {   cerr<<"載入socket動態連結庫失敗"<<endl;   system("pause");  }  if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)  {   cerr<<"申請windows socket vision 2.2 錯誤"<<endl;   system("pause");  }  HANDLE completionPort = CreateIoCompletionPort( (HANDLE)-1, NULL, 0, 0);    if (NULL == completionPort)  {   cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl;     system("pause");     return -1;    }    SYSTEM_INFO mySysInfo;    GetSystemInfo(&mySysInfo);    for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i){     HANDLE ThreadHandle = (HANDLE)_beginthreadex(NULL,0,srvrecv, completionPort, 0, NULL);   if(NULL == ThreadHandle){      cerr << "Create Thread Handle failed. Error:" << GetLastError() << endl;      system("pause");      return -1;     }     CloseHandle(ThreadHandle);    }  PERSOCKET* socket_a = new PERSOCKET;  socket_a->socket = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);  SOCKADDR_IN serveaddr;  serveaddr.sin_addr.s_addr = htonl(INADDR_ANY);  serveaddr.sin_family = AF_INET;  serveaddr.sin_port = htons(DefaultPort);  int ibind = bind(socket_a->socket,(SOCKADDR*)&serveaddr,sizeof(SOCKADDR));  if (ibind==-1)  {   cerr<<"socket bind error"<<GetLastError()<<endl;  }  CreateIoCompletionPort((HANDLE)socket_a->socket,   completionPort,   (ULONG_PTR)socket_a,   0);  int listento = listen(socket_a->socket,10);  if (listento==-1)  {   cerr<<"listen error"<<GetLastError()<<endl;   system("pause");  }  Userssocket = new PERSOCKET [ALLUSERS];  for (int i=0;i<ALLUSERS;++i) //socket池  {   SOCKET socket_s = socket(AF_INET,SOCK_STREAM,0);   Userssocket[i].UserID = i;   Userssocket[i].socket = socket_s;   per_socketpool.add(&Userssocket[i]);  }  OVERLAPEDEx* AllIO[ALLUSERS*3]; //IO池  for (int i=0;i<ALLUSERS*3;++i)  {   AllIO[i] = new OVERLAPEDEx;   AllIO[i]->IOType = FREE;   per_IOpool.add(AllIO[i]);  }  servesend = (HANDLE)_beginthreadex(NULL,0,srvsend,&per_socketusing,0,NULL);  DWORD dwBytes;  WSAIoctl(socket_a->socket,   SIO_GET_EXTENSION_FUNCTION_POINTER,   &GuiddisconnectEx,   sizeof(GuiddisconnectEx),   &CSY_Disconnect,   sizeof(LPFN_DISCONNECTEX),   &dwBytes,   NULL,   NULL);  WSAIoctl(socket_a->socket,   SIO_GET_EXTENSION_FUNCTION_POINTER, //操作指標   &GuidAcceptEx, //輸入緩衝區地址   sizeof(GuidAcceptEx),   &CSY_AcceptEx, //輸出緩衝區地址   sizeof(LPFN_ACCEPTEX),   &dwBytes, //輸出位元組的地址   NULL,   NULL);  WaitForSingleObject(mutex,INFINITE);  cout<<"Ready~~GO!!"<<endl;  ReleaseMutex(mutex);  while (1)  {   while (acceptwillflags&&(onlineusers<(ALLUSERS)))   {    WaitForSingleObject(mutex,INFINITE);    per_IOusing.add(per_IOpool.head->data);    per_IOpool.removehead();    cout<<"投遞AcceptEx..."<<endl;    cout<<"剩餘IO:"<<per_IOpool.size;    cout<<" 正在使用IO:"<<per_IOusing.size<<endl;    ReleaseMutex(mutex);     memset(per_IOusing.tail->data,0,size_t(sizeof(OVERLAPEDEx)));     per_IOusing.tail->data->databuff.len = 2048;     per_IOusing.tail->data->databuff.buf = per_IOusing.tail->data->buffer;     per_IOusing.tail->data->IOType = ACCEPT;     per_IOusing.tail->data->UserID = per_socketpool.head->data->UserID;     per_IOusing.tail->data->socket = (HANDLE)&per_socketpool.head->data->socket;     DWORD i_dwBytes;     int ca = sizeof(IN_ADDR);     int cf = sizeof(SOCKADDR_IN);     WaitForSingleObject(mutex,INFINITE);     int i_acc = AcceptEx(socket_a->socket,      per_socketpool.head->data->socket,      per_IOusing.tail->data->databuff.buf,      per_IOusing.tail->data->databuff.len-(sizeof(SOCKADDR_IN)+16)*2,      sizeof(SOCKADDR_IN)+16,      sizeof(SOCKADDR_IN)+16,      &i_dwBytes,      &(per_IOusing.tail->data->overlaped));     if (i_acc==0&&GetLastError()!=997)     {      cout<<"accept socket error"<<GetLastError()<<endl;      system("pause");     }     acceptedflags = 1;     acceptcheck = 1;     acceptwillflags = 0;     ReleaseMutex(mutex);   }   while(acceptcheck)   {    if (acceptedflags==0)    {     acceptwillflags = 1;     acceptcheck = 0;    }   }  }  return 0; }


List.h