IOCP簡易實現(客戶端與客戶端之間可以聊天)
阿新 • • 發佈:2019-01-23
#include "stdafx.h"
#include <iostream>
#include "List.h"
#define RECV 1
#define SEND 2
#define ACCEPT 0
#define DISCONNECT 3
#define FREE 4
const int DefaultPort = 8000;
const int ALLUSERS = 1000;
using namespace std;
List<OVERLAPEDEx*> per_IOpool;
List<OVERLAPEDEx*> per_IOusing;
List<PERSOCKET*> per_socketpool;
List<PERSOCKET*> per_socketusing;
PERSOCKET* Userssocket;
int onlineusers = 0,
acceptedflags = 1,
acceptwillflags = 1,
acceptcheck = 1;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
LPFN_ACCEPTEX CSY_AcceptEx = NULL;
GUID GuiddisconnectEx = WSAID_DISCONNECTEX;
LPFN_DISCONNECTEX CSY_Disconnect = NULL;
HANDLE mutex = CreateMutex(NULL,FALSE,NULL);
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Kernel32.lib")
#pragma comment(lib,"Mswsock.lib")
void AcceptIO(OVERLAPEDEx*);
void RecvIO(OVERLAPEDEx*);
void SendIO(OVERLAPEDEx*);
void DisconnectIO(OVERLAPEDEx*);
unsigned int __stdcall srvsend(void *complt)
{
char chatting[200];
int a;
while (1)
{
getchar();
if ((per_socketusing.size)>0)
{
if (onlineusers>0)
{
WaitForSingleObject(mutex,INFINITE);
cout<<endl<<"請輸入內容:";
ReleaseMutex(mutex);
gets(chatting+1);
}
WaitForSingleObject(mutex,INFINITE);
chatting[0] = 'f';
node<PERSOCKET*>* temp = temp = per_socketusing.head;
for (int i=0;i<per_socketusing.size;i++)
{
if (send(Userssocket[temp->data->UserID].socket,chatting,200,0)<0)
{
cout<<"No."<<a<<" can't connect server"<<endl;
}
else
{
cout<<"回執已傳送。"<<endl;
}
temp = temp->next;
}
ReleaseMutex(mutex);
}
else
{
WaitForSingleObject(mutex,INFINITE);
cout<<"There is no client link~"<<endl;
ReleaseMutex(mutex);
}
}
}
unsigned int __stdcall srvrecv(void *complt)
{
HANDLE completion = (HANDLE)complt;
DWORD transbytes;
DWORD recvbytes;
DWORD Recvbytes;
DWORD flags = 0;
DWORD Flags = 0;
LPOVERLAPPED theoverlaped = NULL;
OVERLAPEDEx *peroverlapedex = NULL;
PERSOCKET *thesocket = NULL;
int gqc = 1;
while (1)
{
gqc = GetQueuedCompletionStatus(completion,
&transbytes,
(PULONG_PTR)&thesocket,
(LPOVERLAPPED*)&theoverlaped,
INFINITE);
peroverlapedex = (OVERLAPEDEx*)theoverlaped;
if (gqc==0&&transbytes==0)
{
DWORD a = GetLastError();
WaitForSingleObject(mutex,INFINITE);
onlineusers--;
cout<<"No."<<peroverlapedex->UserID<<"client exit,error="<<a<<endl;
per_socketpool.add(&Userssocket[peroverlapedex->UserID]);
per_socketusing.remove(&Userssocket[peroverlapedex->UserID]);
cout<<"回收Socket..."<<endl;
cout<<"剩餘Socket:"<<per_socketpool.size;
cout<<" 正在使用Socket:"<<per_socketusing.size<<endl;
peroverlapedex->IOType = FREE;
cout<<"投遞Disconnect..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
CSY_Disconnect(thesocket->socket,
&peroverlapedex->overlaped,
TF_REUSE_SOCKET,
0);
node<PERSOCKET*>* temp = temp = per_socketusing.head;
for (int i=0;i<per_socketusing.size;i++)
{
ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));
per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;
sprintf(per_IOpool.head->data->buffer,"f%d號客戶端下線了,當前有%d人線上",thesocket->UserID,per_socketusing.size);
per_IOpool.head->data->databuff.len = 32;
per_IOpool.head->data->IOType = SEND;
per_IOpool.head->data->UserID = per_socketusing.tail->data->UserID;
per_IOpool.head->data->socket = (HANDLE)per_socketusing.tail->data->socket;
WSASend(temp->data->socket,
&per_IOpool.head->data->databuff,
1,
&Recvbytes,
flags,
&per_IOpool.head->data->overlaped,
0);
per_IOusing.add(per_IOpool.head->data);
per_IOpool.removehead();
cout<<"投遞WSASend..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
temp = temp->next;
}
ReleaseMutex(mutex);
continue;
}
if (peroverlapedex->IOType==ACCEPT)
{
WaitForSingleObject(mutex,INFINITE);
per_socketusing.add(per_socketpool.head->data);
per_socketpool.removehead();
cout<<"獲取Socket..."<<endl;
cout<<"剩餘Socket:"<<per_socketpool.size;
cout<<" 正在使用Socket:"<<per_socketusing.size<<endl;
onlineusers++;
acceptedflags = 0;
CreateIoCompletionPort((HANDLE)per_socketusing.tail->data->socket,
completion,
(DWORD)&per_socketusing.tail->data->socket,
0);
char ip[4] = {0};
for (int i=2026,n=0;i<2030;++i,++n)
{
ip[n] = peroverlapedex->buffer[i];
}
int IP[4];
for (int i=0;i<4;++i)
{
IP[i] = ((int)ip[i]+256)%256;
}
cout<<"IP:"<<IP[0]<<"."<<IP[1]<<"."<<IP[2]<<"."<<IP[3]<<":"<<peroverlapedex->databuff.buf<<endl;
ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));
per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;
sprintf(per_IOpool.head->data->buffer,"f登入成功,當前有%d人線上",per_socketusing.size);
per_IOpool.head->data->databuff.len = 25;
per_IOpool.head->data->IOType = SEND;
per_IOpool.head->data->UserID = per_socketusing.tail->data->UserID;
per_IOpool.head->data->socket = (HANDLE)per_socketusing.tail->data->socket;
WSASend(per_socketusing.tail->data->socket,
&per_IOpool.head->data->databuff,
1,
&Recvbytes,
flags,
&per_IOpool.head->data->overlaped,
0);
per_IOusing.add(per_IOpool.head->data);
per_IOpool.removehead();
cout<<"投遞WSASend..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
node<PERSOCKET*>* temp = temp = per_socketusing.head;
for (int i=0;i<per_socketusing.size-1;i++)
{
ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));
per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;
sprintf(per_IOpool.head->data->buffer,"f%d號客戶端上線了,當前有%d人線上",per_socketusing.tail->data->UserID,per_socketusing.size);
per_IOpool.head->data->databuff.len = 32;
per_IOpool.head->data->IOType = SEND;
per_IOpool.head->data->UserID = per_socketusing.tail->data->UserID;
per_IOpool.head->data->socket = (HANDLE)per_socketusing.tail->data->socket;
WSASend(temp->data->socket,
&per_IOpool.head->data->databuff,
1,
&Recvbytes,
flags,
&per_IOpool.head->data->overlaped,
0);
per_IOusing.add(per_IOpool.head->data);
per_IOpool.removehead();
cout<<"投遞WSASend..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
temp = temp->next;
}
ZeroMemory(&(peroverlapedex->overlaped), sizeof(OVERLAPPED));
peroverlapedex->databuff.buf = peroverlapedex->buffer;
peroverlapedex->databuff.len = 2048;
peroverlapedex->IOType = RECV;
peroverlapedex->UserID = per_socketusing.tail->data->UserID;
peroverlapedex->socket = (HANDLE)per_socketusing.tail->data->socket;
WSARecv(per_socketusing.tail->data->socket,
&peroverlapedex->databuff,
1,
&recvbytes,
&Flags,
&peroverlapedex->overlaped,
0);
ReleaseMutex(mutex);
}
else
{
if (peroverlapedex->IOType==RECV)
{
WaitForSingleObject(mutex,INFINITE);
cout<<"客戶端"<<peroverlapedex->UserID<<"號:"<<peroverlapedex->databuff.buf+4<<endl;
ZeroMemory(&(per_IOpool.head->data->overlaped), sizeof(OVERLAPPED));
per_IOpool.head->data->databuff.buf = per_IOpool.head->data->buffer;
strcpy(per_IOpool.head->data->databuff.buf+4,peroverlapedex->databuff.buf+4);
memcpy(per_IOpool.head->data->databuff.buf,&peroverlapedex->UserID,4);
per_IOpool.head->data->databuff.len = strlen(peroverlapedex->databuff.buf+4)+5;
per_IOpool.head->data->IOType = SEND;
per_IOpool.head->data->UserID = *(int*)(peroverlapedex->buffer);
per_IOpool.head->data->socket = (HANDLE)Userssocket[*(int*)peroverlapedex->buffer].socket;
WSASend(Userssocket[*(int*)peroverlapedex->buffer].socket,
&per_IOpool.head->data->databuff,
1,
&Recvbytes,
flags,
&per_IOpool.head->data->overlaped,
0);
per_IOusing.add(per_IOpool.head->data);
per_IOpool.removehead();
cout<<"投遞WSASend..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
ZeroMemory(peroverlapedex, sizeof(OVERLAPPED));
peroverlapedex->databuff.buf = peroverlapedex->buffer;
peroverlapedex->databuff.len = 2048;
peroverlapedex->IOType = RECV;
WSARecv(Userssocket[peroverlapedex->UserID].socket,
&peroverlapedex->databuff,
1,
&recvbytes,
&flags,
&peroverlapedex->overlaped,
0);
cout<<"投遞WSARecv..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
ReleaseMutex(mutex);
}
else
{
if (peroverlapedex->IOType==SEND)
{
WaitForSingleObject(mutex,INFINITE);
per_IOpool.add(peroverlapedex);
per_IOusing.remove(peroverlapedex);
cout<<"回收WSASend..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
ReleaseMutex(mutex);
}
else
{
if (peroverlapedex->IOType==FREE)
{
WaitForSingleObject(mutex,INFINITE);
per_IOpool.add(peroverlapedex);
per_IOusing.remove(peroverlapedex);
cout<<"回收WSARecv..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
ReleaseMutex(mutex);
}
else
{
if (peroverlapedex->IOType==DISCONNECT)
{
WaitForSingleObject(mutex,INFINITE);
per_IOpool.add(peroverlapedex);
per_IOusing.remove(peroverlapedex);
cout<<"回收Disconnect..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
ReleaseMutex(mutex);
}
}
}
}
}
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
HANDLE servesend;
WORD winsock = MAKEWORD(2,2);
WSADATA wsaData;
DWORD err = WSAStartup(winsock,&wsaData);
if (err!=0)
{
cerr<<"載入socket動態連結庫失敗"<<endl;
system("pause");
}
if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
{
cerr<<"申請windows socket vision 2.2 錯誤"<<endl;
system("pause");
}
HANDLE completionPort = CreateIoCompletionPort( (HANDLE)-1, NULL, 0, 0);
if (NULL == completionPort)
{
cerr << "CreateIoCompletionPort failed. Error:" << GetLastError() << endl;
system("pause");
return -1;
}
SYSTEM_INFO mySysInfo;
GetSystemInfo(&mySysInfo);
for(DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2); ++i){
HANDLE ThreadHandle = (HANDLE)_beginthreadex(NULL,0,srvrecv, completionPort, 0, NULL);
if(NULL == ThreadHandle){
cerr << "Create Thread Handle failed. Error:" << GetLastError() << endl;
system("pause");
return -1;
}
CloseHandle(ThreadHandle);
}
PERSOCKET* socket_a = new PERSOCKET;
socket_a->socket = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
SOCKADDR_IN serveaddr;
serveaddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveaddr.sin_family = AF_INET;
serveaddr.sin_port = htons(DefaultPort);
int ibind = bind(socket_a->socket,(SOCKADDR*)&serveaddr,sizeof(SOCKADDR));
if (ibind==-1)
{
cerr<<"socket bind error"<<GetLastError()<<endl;
}
CreateIoCompletionPort((HANDLE)socket_a->socket,
completionPort,
(ULONG_PTR)socket_a,
0);
int listento = listen(socket_a->socket,10);
if (listento==-1)
{
cerr<<"listen error"<<GetLastError()<<endl;
system("pause");
}
Userssocket = new PERSOCKET [ALLUSERS];
for (int i=0;i<ALLUSERS;++i) //socket池
{
SOCKET socket_s = socket(AF_INET,SOCK_STREAM,0);
Userssocket[i].UserID = i;
Userssocket[i].socket = socket_s;
per_socketpool.add(&Userssocket[i]);
}
OVERLAPEDEx* AllIO[ALLUSERS*3]; //IO池
for (int i=0;i<ALLUSERS*3;++i)
{
AllIO[i] = new OVERLAPEDEx;
AllIO[i]->IOType = FREE;
per_IOpool.add(AllIO[i]);
}
servesend = (HANDLE)_beginthreadex(NULL,0,srvsend,&per_socketusing,0,NULL);
DWORD dwBytes;
WSAIoctl(socket_a->socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuiddisconnectEx,
sizeof(GuiddisconnectEx),
&CSY_Disconnect,
sizeof(LPFN_DISCONNECTEX),
&dwBytes,
NULL,
NULL);
WSAIoctl(socket_a->socket,
SIO_GET_EXTENSION_FUNCTION_POINTER, //操作指標
&GuidAcceptEx, //輸入緩衝區地址
sizeof(GuidAcceptEx),
&CSY_AcceptEx, //輸出緩衝區地址
sizeof(LPFN_ACCEPTEX),
&dwBytes, //輸出位元組的地址
NULL,
NULL);
WaitForSingleObject(mutex,INFINITE);
cout<<"Ready~~GO!!"<<endl;
ReleaseMutex(mutex);
while (1)
{
while (acceptwillflags&&(onlineusers<(ALLUSERS)))
{
WaitForSingleObject(mutex,INFINITE);
per_IOusing.add(per_IOpool.head->data);
per_IOpool.removehead();
cout<<"投遞AcceptEx..."<<endl;
cout<<"剩餘IO:"<<per_IOpool.size;
cout<<" 正在使用IO:"<<per_IOusing.size<<endl;
ReleaseMutex(mutex);
memset(per_IOusing.tail->data,0,size_t(sizeof(OVERLAPEDEx)));
per_IOusing.tail->data->databuff.len = 2048;
per_IOusing.tail->data->databuff.buf = per_IOusing.tail->data->buffer;
per_IOusing.tail->data->IOType = ACCEPT;
per_IOusing.tail->data->UserID = per_socketpool.head->data->UserID;
per_IOusing.tail->data->socket = (HANDLE)&per_socketpool.head->data->socket;
DWORD i_dwBytes;
int ca = sizeof(IN_ADDR);
int cf = sizeof(SOCKADDR_IN);
WaitForSingleObject(mutex,INFINITE);
int i_acc = AcceptEx(socket_a->socket,
per_socketpool.head->data->socket,
per_IOusing.tail->data->databuff.buf,
per_IOusing.tail->data->databuff.len-(sizeof(SOCKADDR_IN)+16)*2,
sizeof(SOCKADDR_IN)+16,
sizeof(SOCKADDR_IN)+16,
&i_dwBytes,
&(per_IOusing.tail->data->overlaped));
if (i_acc==0&&GetLastError()!=997)
{
cout<<"accept socket error"<<GetLastError()<<endl;
system("pause");
}
acceptedflags = 1;
acceptcheck = 1;
acceptwillflags = 0;
ReleaseMutex(mutex);
}
while(acceptcheck)
{
if (acceptedflags==0)
{
acceptwillflags = 1;
acceptcheck = 0;
}
}
}
return 0;
}