1. 程式人生 > >《Netty 權威指南》—— 偽非同步IO程式設計

《Netty 權威指南》—— 偽非同步IO程式設計

宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,

為了解決同步阻塞IO面臨的一個鏈路需要一個執行緒處理的問題,後來有人對它的執行緒模型進行了優化,後端通過一個執行緒池來處理多個客戶端的請求接入,形成客戶端個數M:執行緒池最大執行緒數N的比例關係,其中M可以遠遠大於N,通過執行緒池可以靈活的調配執行緒資源,設定執行緒的最大值,防止由於海量併發接入導致執行緒耗盡。 下面,我們結合連線模型圖和原始碼,對偽非同步IO進行分析,看它是否能夠解決同步阻塞IO面臨的問題。

2.1.1.偽非同步IO模型圖

採用執行緒池和任務佇列可以實現一種叫做偽非同步的IO通訊框架,它的模型圖如下:

asyn-io

偽非同步IO服務端通訊模型(M:N)

當有新的客戶端接入的時候,將客戶端的Socket封裝成一個Task(該任務實現java.lang.Runnable介面)投遞到後端的執行緒池中進行處理,JDK的執行緒池維護一個訊息佇列和N個活躍執行緒對訊息佇列中的任務進行處理。由於執行緒池可以設定訊息佇列的大小和最大執行緒數,因此,它的資源佔用是可控的,無論多少個客戶端併發訪問,都不會導致資源的耗盡和宕機。 下面的小節,我們依然採用時間伺服器程式,將其改造成偽非同步IO時間伺服器,然後通過對程式碼進行分析,找出其弊端。

2.1.1.偽非同步式IO建立的TimeServer原始碼分析

我們對服務端程式碼進行一些改造,原始碼如下:

public class TimeServer {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
	int port = 8080;
	if (args != null && args.length > 0) {
	    try {
		port = Integer.valueOf(args[0]);
	    } catch (NumberFormatException e) {
		// 採用預設值
	    }
	}
	ServerSocket server = null;
	try {
	    server = new ServerSocket(port);
	    System.out.println("The time server is start in port : " + port);
	    Socket socket = null;
	    TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(
		    50, 10000);// 建立IO任務執行緒池
	    while (true) {
		socket = server.accept();
		singleExecutor.execute(new TimeServerHandler(socket));
	    }
	} finally {
	    if (server != null) {
		System.out.println("The time server close");
		server.close();
		server = null;
	    }
	}
    }
}

偽非同步IO的主函式程式碼發生了變化,我們首先建立一個時間伺服器處理類的執行緒池,當接收到新的客戶端連線的時候,將請求Socket封裝成一個Task,然後呼叫執行緒池的execute方法執行,從而避免了每個請求接入都建立一個新的執行緒。 偽非同步IO的TimeServerHandlerExecutePool:

public class TimeServerHandlerExecutePool {

    private ExecutorService executor;

    public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
	executor = new ThreadPoolExecutor(Runtime.getRuntime()
		.availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS,
		new ArrayBlockingQueue(queueSize));
    }
    public void execute(java.lang.Runnable task) {
	executor.execute(task);
    }
}

由於執行緒池和訊息佇列都是有界的,因此,無論客戶端併發連線數多大,它都不會導致執行緒個數過於膨脹或者記憶體溢位,相比於傳統的一連線一執行緒模型,是一種改良。 由於客戶端程式碼並沒有改變,因此,我們直接執行服務端和客戶端,看執行結果: 服務端執行結果:

1

偽非同步IO時間伺服器服務端執行結果

客戶端執行結果:

2

偽非同步IO時間伺服器客戶端執行結果

偽非同步IO通訊框架採用了執行緒池實現,因此避免了為每個請求都建立一個獨立執行緒造成的執行緒資源耗盡問題。但是由於它底層的通訊依然採用同步阻塞模型,因此無法從根本上解決問題。下個小節我們對偽非同步IO進行深入分析,找到它的弊端,然後看看NIO是如何從根本上解決這個問題的。

2.1.1.偽非同步IO弊端分析

要對偽非同步IO的弊端進行深入分析,首先我們看兩個JAVA同步IO的API說明,隨後我們結合程式碼進行詳細分析。

/**
     * Reads some number of bytes from the input stream and stores them into
     * the buffer array <code>b</code>. The number of bytes actually read is
     * returned as an integer.  This method blocks until input data is
     * available, end of file is detected, or an exception is thrown.
     *
     *
 If the length of <code>b</code> is zero, then no bytes are read and
     * <code>0</code> is returned; otherwise, there is an attempt to read at
     * least one byte. If no byte is available because the stream is at the
     * end of the file, the value <code>-1</code> is returned; otherwise, at
     * least one byte is read and stored into <code>b</code>.
     *
     *
 The first byte read is stored into element <code>b[0]</code>, the
     * next one into <code>b[1]</code>, and so on. The number of bytes read is,
     * at most, equal to the length of <code>b</code>. Let <i>k</i> be the
     * number of bytes actually read; these bytes will be stored in elements
     * <code>b[0]</code> through <code>b[</code><i>k</i><code>-1]</code>,
     * leaving elements <code>b[</code><i>k</i><code>]</code> through
     * <code>b[b.length-1]</code> unaffected.
     *
     * @param      b   the buffer into which the data is read.
     * @return     the total number of bytes read into the buffer, or
     *             <code>-1</code> if there is no more data because the end of
     *             the stream has been reached.
     * @exception  IOException  If the first byte cannot be read for any reason
     * other than the end of the file, if the input stream has been closed, or
     * if some other I/O error occurs.
     * @exception  NullPointerException  if <code>b</code> is <code>null</code>.
     */
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
}

請注意加粗斜體字部分的API說明,當對Socket的輸入流進行讀取操作的時候,它會一直阻塞下去,直到發生如下三種事件: 1)   有資料可讀 2)   可用資料已經讀取完畢 3)   發生空指標或者IO異常 這意味著當對方傳送請求或者應答訊息比較緩慢、或者網路傳輸較慢時,讀取輸入流一方的通訊執行緒將被長時間阻塞,如果對方60S才能夠將資料傳送完成,讀取一方的IO執行緒也將會被同步阻塞60S,在此期間,其它接入訊息只能在訊息佇列中排隊。 下面我們接著對輸出流進行分析,還是看JDK IO類庫輸出流的API文件,然後結合文件說明進行故障分析。 Java 輸入流OutputStream:

public void write(byte b[]) throws IOException
*Writes an array of bytes. This method will block until the bytes are *actually written.
Parameters:
b - the data to be written
Throws: IOException
If an I/O error has occurred.

當呼叫OutputStream的write方法寫輸出流的時候,它將會被阻塞直到所有要傳送的位元組全部寫入完畢,或者發生異常。學習過TCP/IP相關知識的都知道,當訊息的接收方處理緩慢的時候,將不能及時的從TCP緩衝區讀取資料,這將會導致傳送方的TCP window size不斷減小,直到為0,雙方處於Keep-Alive狀態,訊息傳送方將不能再向TCP緩衝區寫入訊息,這時如果採用的是同步阻塞IO,write操作將會被無限期阻塞,直到TCP window size大於0或者發生IO異常。
通過對輸入和輸出流的API文件進行分析,我們瞭解到讀和寫操作都是同步阻塞的,阻塞的時間取決於對方IO執行緒的處理速度和網路IO的傳輸速度。本質上來講,我們無法保證生產環境的網路狀況和對端的應用程式能夠足夠快,如果我們的應用程式依賴對方的處理速度,它的可靠性就非常差。也許在實驗室進行的效能測試結果令大家滿意,但是一旦上線執行,面對惡劣的網路環境和良莠不齊的第三方系統,問題就會如火山一樣噴發。
偽非同步IO實際上僅僅只是對之前IO執行緒模型的一個簡單優化,它無法從根本上解決同步IO導致的通訊執行緒阻塞問題。下面我們就簡單分析下如果通訊對方返回應答時間過長引起的級聯故障:

  1. 服務端處理緩慢,返回應答訊息耗費60S,平時只需要10MS;
  2. 採用偽非同步IO的執行緒正在讀取故障服務節點的響應,由於讀取輸入流是阻塞的,因此,它將會被同步阻塞60S;
  3. 假如所有的可用執行緒都被故障伺服器阻塞,那後續所有的IO訊息都將在佇列中排隊;
  4. 由於執行緒池採用阻塞佇列實現,當佇列積滿之後,後續入佇列的操作將被阻塞;
  5. 由於前端只有一個Accptor執行緒接收客戶端接入,它被阻塞線上程池的同步阻塞佇列之後,新的客戶端請求訊息將被拒絕,客戶端會發生大量的連線超時;
  6. 由於幾乎所有的連線都超時,呼叫者會認為系統已經崩潰,無法接收新的請求訊息。

如何破解這個難題?下個章節的NIO將給出答案。