1. 程式人生 > >一個高可擴充套件的基於非阻塞IO的伺服器架構

一個高可擴充套件的基於非阻塞IO的伺服器架構

  • 目錄
  • 執行緒體系結構
  • 反應堆模式
  • 元件架構
  • 接收器
  • 分配器
  • 分配器級別事件處理器
  • 應用程式級別事件處理器
  • 總結
  • 參考資料

如果你被要求去寫一個高可擴充套件性的基於JAVA的伺服器,你很快就會決定使用JAVA NIO包。為了讓伺服器跑起來,你可能會花很多時間閱讀部落格和教程來了解執行緒同步需要NIO SELECTOR類以及處理一些常見的陷阱。本文描述了一個面向連線基於NIO的伺服器的基本架構。本文會先看一下一個首選的執行緒模型然後討論伺服器的一些基本元件。
 

Threading Architecture執行緒體系結構

第一種也是最直觀的方式去實現一個多執行緒的伺服器是每個連線一個執行緒的方式。這是JAVA1.4以前的解決方案,由於老版本的JAVA缺少非阻塞的I/O支援。每個連線一個執行緒的方法分配一個獨家的工作執行緒給每個連線。在處理迴圈中,工作執行緒等待新進入的資料,處理這個請求,返回響應資料,然後再呼叫阻塞socket的read方法。

public class Server {
  private ExecutorService executors = Executors.newFixedThreadPool(10);
  private boolean isRunning = true;

  public static void main(String... args) throws ... {
    new Server().launch(Integer.parseInt(args[0]));
  } 

  public void launch(int port) throws ... {
    ServerSocket sso = new ServerSocket(port);
    while (isRunning) {
      Socket s = sso.accept();
      executors.execute(new Worker(s));
    }
  }

  private class Worker implements Runnable {
    private LineNumberReader in = null;
    ...

    Worker(Socket s) throws ... {
      in = new LineNumberReader(new InputStreamReader(...));
      out = ...
    }

    public void run() {
      while (isRunning) {
        try {
          // blocking read of a request (line)
          String request = in.readLine();

          // processing the request
          ...
          String response = ...

          // return the response
          out.write(resonse);
          out.flush();
        } catch (Exception e ) {
          ...
        }
      }
      in.close();
      ...
    }
  }
}

在同時發生的客戶端連線和多個同步工作執行緒之間通常有一個單對單的關係。因為每個連線都有一個相關聯的服務端等待執行緒,因此可以有很好的響應時間。然而,高負載需要更多的同步執行的執行緒,這些限制了可擴充套件性。尤其是,長時間存活的連線像持久化的HTTP連線導致大量的同步工作執行緒存在,有浪費時間等待新的客戶端請求的趨勢。此外,成百上千的同步執行緒會浪費大量的棧空間。注意,舉例來說,Solaris/Sparc預設的JAVA棧空間是512KB.

如果server不得不處理大量同時發生的客戶端,並且能容忍慢,無反應的客戶端,就需要一種供替代的執行緒架構。每個事件一個執行緒的方式通過一種非常高效地方式實現了這樣的需求。工作執行緒和連線獨立,僅被用來處理特定的事件。舉例來說,如果一個數據接收事件發生了,一個工作執行緒將會用來處理特定於應用程式的編碼和服務任務(或至少啟動這些任務)。任務一結束,工作執行緒就會回到執行緒池中。這種方式需要無阻塞的處理socket的I/O。呼叫socket的read或write方法需要時無阻塞的。此外,一個事件系統是必須的;它會發訊號表明是否有新資料,輪流發起socket的read方法。這種方式移除了等待執行緒和工作執行緒之間的一對一關係。這樣一個事件驅動的I/0系統的設計將會在反應堆模式中描述。

The Reactor Pattern反應堆模式

反應堆模式,如圖1所示,把事件的檢測例如準備就緒讀或者準備就緒接受資料和事件的處理分離。如果一個準備就緒的事件發生了,專用工作執行緒內的一個事件處理器就會被通知去執行適當的處理。

Figure1

Figure 1. A NIO-based Reactor pattern implementation

連線通道需要先在Selector類中註冊才能參與事件的架構。這可以通過呼叫regisster()方法來實現。雖然這個方法是SocketChannel的一部分,這個通道將會在Selector中註冊,沒有其它的方法。

...
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);

// register the connection
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ);
...

為了檢測新的事件,Selector類提供了請求已註冊的通道就緒事件的能力。通過呼叫select方法 ,Selector收集已註冊通道的就緒事件。這個方法的呼叫會阻塞,直到至少一個事件已經發生。在這種情況下,方法返回了自上次呼叫之後就緒的I/O操作的連線數。所選的連線可以通過呼叫Selector的selectedkey方法來檢測。這個方法返回一個Selectionkey物件集合,裡面存放了IO事件的狀態和連線通道的引用。

一個Selector存在於Dispatcher中。這是一個單執行緒的活動類圍饒著Selector類。Dispatcher類的職責是檢測事件然後分發消費事件的處理給EventHandler類。在這個分發迴圈中,Dispatcher類呼叫Selector類的select方法等待新的事件。如果至少一個事件發生了,這個方法就返回,每個事件相關的通道可以通過呼叫selectedkeys方法獲得。

...
while (isRunning) {
  // blocking call, to wait for new readiness events
  int eventCount = selector.select(); 

  // get the events
  Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();

    // readable event?
    if (key.isValid() && key.isReadable()) {
      eventHandler.onReadableEvent(key.channel());
    }

    // writable event?
    if (key.isValid() && key.isWritable()) {
      key.interestOps(SelectionKey.OP_READ); // reset to read only
      eventHandler.onWriteableEvent(key.channel());
    }
    ...
  }
  ...
}

基於一個事件,類似於就緒讀或就緒寫,EventHandler會被Dispatcher呼叫來處理這個事件。EventHandler解碼請求資料,處理必須的服務活動,編碼響應資料。由於工作執行緒沒有被強制去浪費時間等待新的請求然後建立一個連線,這種方式的可擴充套件性和吞吐量理論上只限制於系統資源像CPU和記憶體。這既便是說,響應時間將沒有每個連線一個執行緒的方式快,由於參與執行緒間的切換和同步。事件驅動方法的挑戰因此是最少化同步和優化執行緒管理,以致於這些影響可以被忽略。

Component Architecture元件架構

大多數具有高可擴充套件性的JAVA伺服器都是建立在反應堆模式上的。這樣做,反應堆模式中的類將會被增強,因為需要額外的類來連線管理,緩衝區管理,以及負載均衡。這個服用器的入口類是一個Acceptor。這個安排如圖2所示。

Figure2

Figure 2. Major components of a connection-oriented server

Acceptor接收器

一個伺服器每個新的客戶端連線將會被單個Acceptor所接收,Acceptor與伺服器的埠繫結。接收器是一個單執行緒的活動類。由於Acceptor僅負責處理歷時非常短的客戶端連線請求,經常只要用阻塞I/0模式實現Acceptor就足夠了。Acceptor通過呼叫Serversocketchannel的阻塞accept方法來處理新請求。新請求將會註冊到Dispatcher,這之後,請求就可以參與到事件處理中了。

由於一個Dispatcher的可擴充套件性非常有限,通常都會使用一個小的Dispatchers的池。這個限制當中的一個原因是特定的作業系統實現的Selector。大多數的作業系統一對一的對映SocketChannel和檔案處理。取決於具體的系統,每個Selector的最大檔案處理數的限制也是不同的。

class Acceptor implements Runnable {
  ...
  void init() {
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(true);
    serverChannel.socket().bind(new InetSocketAddress(serverPort));
  }

  public void run() {
    while (isRunning) {
      try {
        SocketChannel channel = serverChannel.accept(); 

        Connection con = new Connection(channel, appHandler);
        dispatcherPool.nextDispatcher().register(con);
      } catch (...) {
        ...
      }
    }
  }
}

在示例程式碼中,一個連線物件持有SocketChannel和應用級別的事件處理器。我們將會在下面描述這些類。

Dispatcher分配器

通過呼叫Dispatcher的register方法,SocketChannel將會註冊到相關的Selector上。這裡就是問題的來源。Selector在內部使用key集合來管理註冊的通道。這意味著每次註冊一個通道,一個相關連的SelectionKey會被建立並被加入到Selector的註冊key集合。同時,併發的分發執行緒可以呼叫Selector的select方法,也會訪問這個key集合。由於key集合是非執行緒安全的,一個非同步的Acceptor上下文註冊會導致死鎖和競爭。這個可以通過實現selector guard object idiom來解決,它允許暫時的掛起分配執行緒。參考”“http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf”> How to Build a Scalable Multiplexed Server with NIO” (PDF)來檢視這個方法的解釋。

class Dispatcher implements Runnable {
  private Object guard = new Object();
  …

  void register(Connection con) {
    // retrieve the guard lock and wake up the dispatcher thread
    // to register the connection's channel
    synchronized (guard) {
      selector.wakeup();
      con.getChannel().register(selector, SelectionKey.OP_READ, con);
    }

    // notify the application EventHandler about the new connection
    …
  }

  void announceWriteNeed(Connection con) {
    SelectionKey key = con.getChannel().keyFor(selector);
    synchronized (guard) {
      selector.wakeup();
      key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }
  }

  public void run() {
    while (isRunning) {
      synchronized (guard) {
        // suspend the dispatcher thead if guard is locked
      }
      int eventCount = selector.select();

      Iterator<SelectionKey> it = selector.selectedKeys().iterator();
      while (it.hasNext()) {
        SelectionKey key = it.next();
        it.remove();

        // read event?
        if (key.isValid() && key.isReadable()) {
          Connection con = (Connection) key.attachment();
          disptacherEventHandler.onReadableEvent(con);
        }

        // write event?
        …
      }
    }
  }
}

在這個連線註冊之後,Selector監聽這個連線的就緒事件。如果一個事件發生了,通過傳遞相關的連線,這個Dispatcher的事件處理類的合適的回撥方法將會被呼叫。

分配器級別事件處理器

處理一個就緒讀事件的第一個行為是呼叫通道的讀方法。與流介面相反,通道介面需要忽略讀緩衝介面。通常會使用直接分配的ByteBuffer。直接緩衝區存在於本地記憶體,繞過JAVA堆記憶體。通過使用直接緩衝,socket的IO操作不再需要建立內部中間緩衝器。

通常情況下,讀請求會被非常快的執行。Socket的讀操作通常只是把一份接收到的資料從核心記憶體空間拷貝到讀緩衝區,這個資料會存在於使用者控制的記憶體空間。這些接收的資料將會被新增到連線的執行緒安全的讀佇列作進一步的處理。基於I/O操作的結果,特定於應用程式的任務會被執行。這些任務會被分配的應用級別的事件處理器處理。這類處理器通常被稱為工作執行緒。

class DispatcherEventHandler {
  ...

  void onReadableEvent(final Connection con) {
    // get the received data
    ByteBuffer readBuffer = allocateMemory();
    con.getChannel().read(readBuffer);
    ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);

    // append it to read queue
    con.getReadQueue().add(data);
    ...

    // perform further operations (encode, process, decode)
    // by a worker thread
    if (con.getReadQueue().getSize() > 0) {
      workerPool.execute(new Runnable() {
        public void run() {
          synchronized (con) {
            con.getAppHandler().onData(con);
          }
        }
      });
    }
  }

  void onWriteableEvent(Connection con) {
    ByteBuffer[] data = con.getWriteQueue().drain();
    con.getChannel().write(data); // write the data
    ...

    if (con.getWriteQueue().isEmpty()) {
      if (con.isClosed()) {
        dispatcher.deregister(con);
      }

    } else {
       // there is remaining data to write
       dispatcher.announceWriteNeed(con);
    }
  }
}

在特定於應用程式的任務中,資料會被編碼,服務會被執行,資料會被寫入。在寫資料的時候,要被髮送的資料會加入到寫佇列,然後呼叫Dispatcher類的announceWriteNeed方法。這個方法讓Selector開始監聽就緒讀事件。如果這種事件發生,分配器級別的事件處理器就會執行onWriteableEvent方法。這從通道的寫佇列獲取資料然後執行必要的寫I/O操作。試圖直接寫資料,通過這種方法,將會導致死鎖和競爭。

應用級別事件處理器

與分配器事件處理器相比,特定於應用的事件處理器監聽高級別的面向連線的事件,例如建立連線,資料接收或者是關閉連線。具體的事件處理設計是NIO伺服器框架像SEDA,MINA還有emberIO之間最大的不同。這些框架通常實現了多級的架構,這樣事件處理鏈就可以使用。它允許增加像SSLHandler或DelayerWriteHandler之類可以攔截請求/響應處理的處理器。下面的例子展示了一個基於xSocket框架的應用級別的處理器。xScoket框架支援不同的處理器介面,這些接口裡面定義了需要被實現的特定於應用的回撥方法程式碼。

class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... {
  private static final String DELIMITER = ...
  private Mailbox mailbox = ...

  public static void main(String... args) throws ... {
    new MultithreadedServer(110, new POP3ProtocolHandler()).run();
  }

  public boolean onConnect(INonBlockingConnection con) throws ... {
    if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) {
      con.setWriteTransferRate(5);  // reduce transfer: 5byte/sec
    }

    con.write("+OK My POP3-Server" + DELIMITER);
    return true;
  }

  public boolean onData(INonBlockingConnection con) throws ... {
    String request = con.readStringByDelimiter(DELIMITER);

    if (request.startsWith("QUIT")) {
      mailbox.close();
      con.write("+OK POP3 server signing off" + DELIMITER);
      con.close();

    } else if (request.startsWith("USER")) {
      this.user = request.substring(4, request.length());
      con.write("+OK enter password" + DELIMITER);

    } else if (request.startsWith("PASS")) {
      String pwd = request.substring(4, request.length());
      boolean isAuthenticated = authenticator.check(user, pwd);
      if (isAuthenticated) {
        mailbox = MailBox.openAndLock(user);
        con.write("+OK mailbox locked and ready" + DELIMITER);
      } else {
        ...
      }
    } else if (...) {
      ...
    }
    return true;
  }
}

為了更簡便的訪問底層的讀寫佇列,Connection物件提供了一些便利的面向流和通道的讀寫方法。

通過關閉連線,底層實現初始化一個可寫事件往返的重新整理寫佇列。連線會在遺留的資料被寫完之後終止。除了這樣一個控制終端,連線還能因為其它的原因關閉。例如,硬體故障可能導致基於TCP的連線中斷。這樣的情況只有在socket上執行讀寫操作或空閒超時的時候檢測到。大多數的NIO框架提供一個內建的程式來處理這些不受控制的中斷。

Conclusion總結

一個事件驅動的非阻塞架構是實現高效,高擴充套件性和高穩定性伺服器的一個基本的層。其中的挑戰就是最小化執行緒同步開銷和優化連線和緩衝區的管理。這會是程式設計中最困難的部分。

但是沒有必要重複發明輪子。一些框架像xSocket,emberIO,SEDA或MINA都抽象了低層次的事件處理和執行緒管理來簡化建立高可擴充套件性的伺服器。以上大部分的框架都支援SSL和UDP,本文中未提及這兩點。

Resources參考資料

作者:

Gregor Roth works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.


對新技術都感興趣,喜歡技術鑽研,可是沒鑽研深的,英語還不錯,喜歡分享和交流。