moduo網路庫的reactor模式(下):實現非阻塞TCP網路
1、在reactor框架下加入tcp
Unix下的tcp連線也是經由socket檔案描述符(sockfd)實現的。此節只是封裝了listening sockefd進行監聽(accept(2)),得到的新連線(普通sockfd)直接提供給使用者讓使用者自行處理。下一節才進一步地將得到的新連線也封裝起來。
1.1、首先將unix下的socket呼叫api簡易封裝成Socket類,得到wapper。即將api呼叫如socket()、bind()、listen()、accept()等裹上對錯誤返回值的處理。
#ifndef SOCKET_H_ #define SOCKET_H_ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <fcntl.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> class Socket { public: Socket(unsigned short port) : port_(port) { sockfd_ = socket(AF_INET, SOCK_STREAM, 0); if(sockfd_<0) { perror("socket"); exit(-1); } // non-block int flags = ::fcntl(sockfd_, F_GETFL, 0); flags |= O_NONBLOCK; int ret = ::fcntl(sockfd_, F_SETFL, flags); if(ret==-1) { perror("fcntl"); exit(-1); } // close-on-exec flags = ::fcntl(sockfd_, F_GETFD, 0); flags |= FD_CLOEXEC; ret = ::fcntl(sockfd_, F_SETFD, flags); if(ret==-1) { perror("fcntl"); exit(-1); } } void setReuseAddr(bool on) { int optval = on ? 1 : 0; setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval); // FIXME CHECK } ~Socket() { close(sockfd_); } int fd() { return sockfd_; } void Bind() { struct sockaddr_in my_addr; bzero(&my_addr,sizeof(my_addr)); my_addr.sin_family = AF_INET; my_addr.sin_port = htons(port_); //port my_addr.sin_addr.s_addr = htonl(INADDR_ANY); int err_log=bind(sockfd_, (struct sockaddr*)&my_addr, sizeof(my_addr)); if(err_log!=0) { perror("bind"); close(sockfd_); exit(-1); } } void Listen() { int err_log = listen(sockfd_, 10); if(err_log!=0) { perror("listen"); close(sockfd_); exit(-1); } } int Accept(struct sockaddr_in* peeraddr) { //struct sockaddr_in addr; //bzero(&addr, sizeof addr); //socklen_t addr_len=sizeof(addr); //int connfd = accept(sockfd_, &addr, &addr_len); bzero(peeraddr, sizeof(*peeraddr)); socklen_t peeraddr_len=sizeof(*peeraddr); int connfd = accept(sockfd_, (sockaddr*)peeraddr, &peeraddr_len); if (connfd < 0) { perror("accept"); } return connfd; } private: unsigned short port_; int sockfd_; }; #endif
1.2、封裝listening sockfd為Acceptor類,封裝方法與muduo手法類似。負責監聽(accept(2))外部是否有新建連線。其中
Acceptor::enableReading()為使用者呼叫函式,使能listening sockfd可讀並加入到poll(2)中進行事件迴圈。若可讀(listen到有新連線),則回撥Acceptor::handleRead()進行accept(2)並回呼叫戶函式cb_(),而cb_是由使用者呼叫setNewConnectionCallbackFunc()設定。cb_()則是使用者自行設計的普通sockfd讀、寫等操作處理。
#ifndef ACCEPTOR_H_ #define ACCEPTOR_H_ #include "Socket.hpp" #include "Channel.hpp" #include "Thread.hpp" #include <iostream> class EventLoop; class Acceptor { public: typedef std::function<void(int, sockaddr_in*)> AcceptCallbackFunc; Acceptor(EventLoop* loop, unsigned short port) : loop_(loop), socket_(port), socketChannel_(loop_,socket_.fd()) {} ~Acceptor() {} void setNewConnectionCallbackFunc(AcceptCallbackFunc cb) { cb_=cb; } //user used function void enableReading() { socket_.Bind(); socket_.Listen(); socketChannel_.setReadCallback(std::bind(&Acceptor::handleRead,this)); socketChannel_.enableReading(); } void handleRead() { struct sockaddr_in peeraddr; int connfd=socket_.Accept(&peeraddr); //std::cout<<"tid "<<CurrentThreadtid()<<": server is acceptting in port 6666"<<std::endl; if(connfd>=0) if(cb_) cb_(connfd, &peeraddr); else ::close(connfd); } void setReuseAddr(bool on) { socket_.setReuseAddr(true); } private: EventLoop* loop_; Socket socket_; Channel socketChannel_; AcceptCallbackFunc cb_; }; #endif
1.3、測試
server端:
Acceptor server1(loop, 6666),server2(loop,6688);為兩個不同的listening sockfd封裝類。
Acceptor::setReuseAddr()為該listening sockfd關閉後不必等待則可直接再次使用。
client1()和client2()為使用者回撥函式,此處用於處理普通socketfd的讀操作:當有新建連線時,傳送一句話給連線者然後close。
#include "EventLoopThread.hpp" #include "EventLoop.hpp" #include "Thread.hpp" #include "Acceptor.hpp" #include <iostream> using namespace std; void client1(int connfd, sockaddr_in* peeraddr) { cout<<"tid "<<CurrentThreadtid()<<": server accept a connector"<<endl; if(send(connfd, "How are you", 11, 0) < 0) cout<<"send error"<<endl; close(connfd); } void client2(int connfd, sockaddr_in* peeraddr) { cout<<"tid "<<CurrentThreadtid()<<": server accept a connector"<<endl; if(send(connfd, "What's up", 9, 0) < 0) cout<<"send error"<<endl; close(connfd); } int main() { cout<<"Main: pid: "<<getpid()<<" tid: "<<CurrentThreadtid()<<endl;//main thread EventLoopThread ELThread1; EventLoop* loop = ELThread1.startLoop();//thread 2 Acceptor server1(loop, 6666);//TCP server create in main thread, but accept in thread 2 server1.setReuseAddr(true); server1.setAcceptCallbackFunc(client1); server1.enableReading(); Acceptor server2(loop, 6688);//TCP server create in main thread, but accept in thread 2 server2.setReuseAddr(false); server2.setAcceptCallbackFunc(client2); server2.enableReading(); //loop->loop(); //test "one thread one loop" sleep(20); loop->quit(); sleep(3); return 0; }
client端:
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <string.h>
4 #include <stdlib.h>
5 #include <arpa/inet.h>
6 #include <sys/socket.h>
7 #include <netinet/in.h>
8
9 int main(int argc, char *argv[])
10 {
11 unsigned short port = 6666;
12 char *server_ip = "127.0.0.1";
13
14 int sockfd = socket(AF_INET, SOCK_STREAM, 0);
15 if(sockfd<0)
16 {
17 perror("socket");
18 exit(-1);
19 }
20
21 struct sockaddr_in server_addr;
22 bzero(&server_addr,sizeof(server_addr));
23 server_addr.sin_family = AF_INET;
24 server_addr.sin_port = htons(port);
25 inet_pton(AF_INET, server_ip, &server_addr.sin_addr.s_addr);
26
27 int err_log = connect(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
28 if(err_log!=0)
29 {
30 perror("connect");
31 close(sockfd);
32 exit(-1);
33 }
34
35 char buff[4096];
36 int n=recv(sockfd, buff, 4096, 0);
37 buff[n]='\0';
38 printf("Port %d: recv msg from server: %s\n", port, buff);
39 close(sockfd);
40
41 return 0;
42 }
測試:
server:端
[email protected]:~/Documents/Reactor/s2.1$ ./testAcceptorDemo
Main: pid: 56222 tid: 56222
tid 56222: create a new thread
tid 56222: waiting
tid 56223: Thread::func_() started!
tid 56223: notified
tid 56222: received notification
tid 56223: start looping...
tid 56223: server accept a connector
tid 56223: server accept a connector
tid 56223: server accept a connector
tid 56223: end looping...
tid 56223: Thread end!
client端:
[email protected]:~/Documents$ ./tcp_send
Port 6666: recv msg from server: How are you
[email protected]:~/Documents$ ./tcp_send2
Port 6688: recv msg from server: What's up
[email protected]:~/Documents$ ./tcp_send2
Port 6688: recv msg from server: What's up
2 封裝TCP網路
在上一節的基礎上進一步封裝普通sockfd為TcpConnection類,並將listening sockfd類Acceptor和普通sockfd類TcpConnection封裝到類TcpServer供使用者使用。
封裝方式與muduo手法基本一致:把當前物件(Acceptor物件)封裝到一個新類(TcpServer類)中成為其資料成員。在建構函式中構造該成員(此操作相當於上節中使用者自行構造Acceptor server1(loop,6666)),並在建構函式中使該成員回撥新的回撥函式(把回撥Acceptor::handleRead()改為TcpServer::newConnection())。在新的回撥函式中構造TcpConnection物件(用於封裝accept(2)得到的普通sockfd),當普通sockfd有事件發生時則回撥由使用者傳入的回撥函式(相當於上節中的client1())。
使用者通過設定操作普通sockfd讀、寫等事件的回撥函式,然後呼叫TcpServer::start()使能listening sockfd,進入事件迴圈則可實現非阻塞TCP網路。
-
2.1 封裝listening sockfd為Acceptor類,封裝方法與muduo手法類似。負責監聽(accept(2))外部是否有新建連線。
Acceptor用於監聽,關注連線,建立連線後,由TCPConnection來接管處理;
這個類沒有業務處理,用來處理監聽和連線請求到來後的邏輯;
所有與事件迴圈相關的都是Channel,Acceptor不直接和EventLoop打交道,所以在這個類中需要有一個Channel的成員,幷包含將Channel掛到事件迴圈中的邏輯(listen())。
-
2.2 封裝普通socket為TcpConnection類,封裝方法與muduo手法類似。負責接收新建的普通sockfd。
TcpConnection處理連線建立後的收發資料;業務處理回撥完成。
-
2.3 封裝供使用者使用的TcpServer類,封裝負責監聽listening socketfd的Acceptor類和普通sockfd的TcpConnection類。
作為終端使用者的介面方,和外部打交道通過TCPServer互動,而業務邏輯處理將回調函式傳入到底層,這種傳遞函式的方式猶如資料的傳遞一樣自然和方便;
作用Acceptor和TcpConnection的粘合劑,呼叫Acceptor開始監聽連線並設定回撥,連線請求到來後,在回撥中新建TcpConnection連線,設定TcpConnection的回撥(將使用者的業務處理回撥函式傳入,包括:連線建立後,讀請求處理、寫完後的處理,連線關閉後的處理),從這裡可以看到,業務邏輯的傳遞就跟資料傳遞一樣,多麼漂亮。
總結:
函數語言程式設計中,類之間的關係主要通過組合來實現,而不是通過派生實現; 這也是函數語言程式設計的一個設計理念,更多的使用組合而不是繼承來實現類之間的關係,而支撐其能夠這樣設計的根源在於function()+bind()帶來的函式自由傳遞,實現回撥非常簡單; 而OO設計中,只能使用基於虛擬函式/多型來實現回撥,不可避免的使用繼承結構。