1. 程式人生 > >moduo網路庫的reactor模式(下):實現非阻塞TCP網路

moduo網路庫的reactor模式(下):實現非阻塞TCP網路

1、在reactor框架下加入tcp

Unix下的tcp連線也是經由socket檔案描述符(sockfd)實現的。此節只是封裝了listening sockefd進行監聽(accept(2)),得到的新連線(普通sockfd)直接提供給使用者讓使用者自行處理。下一節才進一步地將得到的新連線也封裝起來。

1.1、首先將unix下的socket呼叫api簡易封裝成Socket類,得到wapper。即將api呼叫如socket()、bind()、listen()、accept()等裹上對錯誤返回值的處理。

#ifndef SOCKET_H_
#define SOCKET_H_

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

class Socket
{
public:
  Socket(unsigned short port) : port_(port) 
  {
    sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd_<0)
    {
      perror("socket");
	  exit(-1);
    }
    // non-block
    int flags = ::fcntl(sockfd_, F_GETFL, 0);
    flags |= O_NONBLOCK;
    int ret = ::fcntl(sockfd_, F_SETFL, flags);
    if(ret==-1)
    {
      perror("fcntl");
	  exit(-1);
    }

    // close-on-exec
    flags = ::fcntl(sockfd_, F_GETFD, 0);
    flags |= FD_CLOEXEC;
    ret = ::fcntl(sockfd_, F_SETFD, flags);
    if(ret==-1)
    {
      perror("fcntl");
	  exit(-1);
    }
  }

  void setReuseAddr(bool on)
  {
    int optval = on ? 1 : 0;
    setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR,
                 &optval, sizeof optval);
    // FIXME CHECK
  }

  ~Socket() 
  {
    close(sockfd_); 
  }
  
  int fd() { return sockfd_; }

  void Bind()
  {
    struct sockaddr_in my_addr;
    bzero(&my_addr,sizeof(my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(port_);   //port
    my_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int err_log=bind(sockfd_, (struct sockaddr*)&my_addr, sizeof(my_addr));
    if(err_log!=0)
    {
        perror("bind");
	    close(sockfd_);
	    exit(-1);
    }
  }
  
  void Listen()
  {
    int err_log = listen(sockfd_, 10);
    if(err_log!=0)
    {
        perror("listen");
	    close(sockfd_);
	    exit(-1);
    }
  }

  int Accept(struct sockaddr_in* peeraddr)
  {
    //struct sockaddr_in addr;
    //bzero(&addr, sizeof addr);
    //socklen_t addr_len=sizeof(addr);
    //int connfd = accept(sockfd_, &addr, &addr_len);

    bzero(peeraddr, sizeof(*peeraddr));
    socklen_t peeraddr_len=sizeof(*peeraddr);

    int connfd = accept(sockfd_, (sockaddr*)peeraddr, &peeraddr_len);
    if (connfd < 0)
    {
      perror("accept");  
    }
    return connfd;
  }

private:
  unsigned short port_;
  int sockfd_;
};
    
#endif

1.2、封裝listening sockfd為Acceptor類,封裝方法與muduo手法類似。負責監聽(accept(2))外部是否有新建連線。其中

Acceptor::enableReading()為使用者呼叫函式,使能listening sockfd可讀並加入到poll(2)中進行事件迴圈。若可讀(listen到有新連線),則回撥Acceptor::handleRead()進行accept(2)並回呼叫戶函式cb_(),而cb_是由使用者呼叫setNewConnectionCallbackFunc()設定。cb_()則是使用者自行設計的普通sockfd讀、寫等操作處理。

#ifndef ACCEPTOR_H_
#define ACCEPTOR_H_

#include "Socket.hpp"
#include "Channel.hpp"
#include "Thread.hpp"
#include <iostream>

class EventLoop;

class Acceptor
{
public:
  typedef std::function<void(int, sockaddr_in*)> AcceptCallbackFunc;
  Acceptor(EventLoop* loop, unsigned short port)
   : loop_(loop), socket_(port), socketChannel_(loop_,socket_.fd())
  {}

  ~Acceptor() {}

  void setNewConnectionCallbackFunc(AcceptCallbackFunc cb)
  {
    cb_=cb;
  }

  //user used function
  void enableReading()
  {
    socket_.Bind();
    socket_.Listen();
    socketChannel_.setReadCallback(std::bind(&Acceptor::handleRead,this));
    socketChannel_.enableReading();
  }
 
  void handleRead()
  {
    struct sockaddr_in peeraddr;
    int connfd=socket_.Accept(&peeraddr);
    //std::cout<<"tid "<<CurrentThreadtid()<<": server is acceptting in port 6666"<<std::endl;
    if(connfd>=0)
      if(cb_)
        cb_(connfd, &peeraddr);
    else
      ::close(connfd);
  }

  void setReuseAddr(bool on)
  {
    socket_.setReuseAddr(true);
  }

private:
  EventLoop* loop_;
  Socket socket_;
  Channel socketChannel_;
  AcceptCallbackFunc cb_;
};

#endif

1.3、測試

server端:

Acceptor server1(loop, 6666),server2(loop,6688);為兩個不同的listening sockfd封裝類。

Acceptor::setReuseAddr()為該listening sockfd關閉後不必等待則可直接再次使用。

client1()和client2()為使用者回撥函式,此處用於處理普通socketfd的讀操作:當有新建連線時,傳送一句話給連線者然後close。

#include "EventLoopThread.hpp"
#include "EventLoop.hpp"
#include "Thread.hpp"
#include "Acceptor.hpp"
#include <iostream>
    
using namespace std;
   
void client1(int connfd, sockaddr_in* peeraddr)
{     
  cout<<"tid "<<CurrentThreadtid()<<": server accept a connector"<<endl;
  if(send(connfd, "How are you", 11, 0) < 0)
    cout<<"send error"<<endl;
  
  close(connfd);
} 
  
void client2(int connfd, sockaddr_in* peeraddr)
{ 
  cout<<"tid "<<CurrentThreadtid()<<": server accept a connector"<<endl;
  if(send(connfd, "What's up", 9, 0) < 0)
    cout<<"send error"<<endl;
  
  close(connfd);
} 

int main()
{
  cout<<"Main: pid: "<<getpid()<<" tid: "<<CurrentThreadtid()<<endl;//main thread

  EventLoopThread ELThread1;
  EventLoop* loop = ELThread1.startLoop();//thread 2
  Acceptor server1(loop, 6666);//TCP server create in main thread, but accept in thread 2
  server1.setReuseAddr(true);
  server1.setAcceptCallbackFunc(client1);
  server1.enableReading();

  Acceptor server2(loop, 6688);//TCP server create in main thread, but accept in thread 2
  server2.setReuseAddr(false);
  server2.setAcceptCallbackFunc(client2);
  server2.enableReading();
  //loop->loop(); //test "one thread one loop"
  sleep(20);
  loop->quit();

  sleep(3);
  return 0;
}

client端:

  1 #include <stdio.h>
  2 #include <unistd.h>
  3 #include <string.h>
  4 #include <stdlib.h>
  5 #include <arpa/inet.h>
  6 #include <sys/socket.h>
  7 #include <netinet/in.h>
  8 
  9 int main(int argc, char *argv[])
 10 {
 11     unsigned short port = 6666;
 12     char *server_ip = "127.0.0.1";
 13 
 14     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
 15     if(sockfd<0)
 16     {
 17         perror("socket");
 18         exit(-1);
 19     }
 20 
 21     struct sockaddr_in server_addr;
 22     bzero(&server_addr,sizeof(server_addr));
 23     server_addr.sin_family = AF_INET;
 24     server_addr.sin_port = htons(port);
 25     inet_pton(AF_INET, server_ip, &server_addr.sin_addr.s_addr);
 26 
 27     int err_log = connect(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
 28     if(err_log!=0)
 29     {
 30         perror("connect");
 31         close(sockfd);
 32         exit(-1);
 33     }
 34 
 35     char buff[4096];
 36     int n=recv(sockfd, buff, 4096, 0);
 37     buff[n]='\0';
 38     printf("Port %d: recv msg from server: %s\n", port, buff);
 39     close(sockfd);
 40 
 41     return 0;
 42 }

 測試:

server:端

[email protected]:~/Documents/Reactor/s2.1$ ./testAcceptorDemo 
Main: pid: 56222 tid: 56222
tid 56222: create a new thread
tid 56222: waiting
tid 56223: Thread::func_() started!
tid 56223: notified
tid 56222: received notification
tid 56223: start looping...
tid 56223: server accept a connector
tid 56223: server accept a connector
tid 56223: server accept a connector
tid 56223: end looping...
tid 56223: Thread end!

client端: 

[email protected]:~/Documents$ ./tcp_send
Port 6666: recv msg from server: How are you
[email protected]:~/Documents$ ./tcp_send2
Port 6688: recv msg from server: What's up
[email protected]:~/Documents$ ./tcp_send2
Port 6688: recv msg from server: What's up

2 封裝TCP網路

在上一節的基礎上進一步封裝普通sockfd為TcpConnection類,並將listening sockfd類Acceptor和普通sockfd類TcpConnection封裝到類TcpServer供使用者使用。

封裝方式與muduo手法基本一致:把當前物件(Acceptor物件)封裝到一個新類(TcpServer類)中成為其資料成員。在建構函式中構造該成員(此操作相當於上節中使用者自行構造Acceptor server1(loop,6666)),並在建構函式中使該成員回撥新的回撥函式(把回撥Acceptor::handleRead()改為TcpServer::newConnection())。在新的回撥函式中構造TcpConnection物件(用於封裝accept(2)得到的普通sockfd),當普通sockfd有事件發生時則回撥由使用者傳入的回撥函式(相當於上節中的client1())。

使用者通過設定操作普通sockfd讀、寫等事件的回撥函式,然後呼叫TcpServer::start()使能listening sockfd,進入事件迴圈則可實現非阻塞TCP網路。

  • 2.1 封裝listening sockfd為Acceptor類,封裝方法與muduo手法類似。負責監聽(accept(2))外部是否有新建連線。

    Acceptor用於監聽,關注連線,建立連線後,由TCPConnection來接管處理;

    這個類沒有業務處理,用來處理監聽和連線請求到來後的邏輯; 

    所有與事件迴圈相關的都是Channel,Acceptor不直接和EventLoop打交道,所以在這個類中需要有一個Channel的成員,幷包含將Channel掛到事件迴圈中的邏輯(listen())。

  • 2.2 封裝普通socket為TcpConnection類,封裝方法與muduo手法類似。負責接收新建的普通sockfd。

    TcpConnection處理連線建立後的收發資料;業務處理回撥完成。

  • 2.3 封裝供使用者使用的TcpServer類,封裝負責監聽listening socketfd的Acceptor類和普通sockfd的TcpConnection類。

    作為終端使用者的介面方,和外部打交道通過TCPServer互動,而業務邏輯處理將回調函式傳入到底層,這種傳遞函式的方式猶如資料的傳遞一樣自然和方便;

    作用Acceptor和TcpConnection的粘合劑,呼叫Acceptor開始監聽連線並設定回撥,連線請求到來後,在回撥中新建TcpConnection連線,設定TcpConnection的回撥(將使用者的業務處理回撥函式傳入,包括:連線建立後,讀請求處理、寫完後的處理,連線關閉後的處理),從這裡可以看到,業務邏輯的傳遞就跟資料傳遞一樣,多麼漂亮。

總結:

函數語言程式設計中,類之間的關係主要通過組合來實現,而不是通過派生實現; 這也是函數語言程式設計的一個設計理念,更多的使用組合而不是繼承來實現類之間的關係,而支撐其能夠這樣設計的根源在於function()+bind()帶來的函式自由傳遞,實現回撥非常簡單;  而OO設計中,只能使用基於虛擬函式/多型來實現回撥,不可避免的使用繼承結構。