1. 程式人生 > >《Netty 權威指南》—— NIO建立的TimeServer原始碼分析

《Netty 權威指南》—— NIO建立的TimeServer原始碼分析

宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,

我們將在TimeServer例程中給出完整的NIO建立的時間伺服器原始碼:

public class TimeServer {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
	int port = 8080;
	if (args != null && args.length > 0) {
	    try {
		port = Integer.valueOf(args[0]);
	    } catch (NumberFormatException e) {
		// 採用預設值
	    }
	}
	MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
	New Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

我們對NIO建立的TimeServer進行下簡單分析,8-15行跟之前的一樣,設定監聽埠。16-17行建立了一個被稱為MultiplexerTimeServer的多路複用類,它是個一個獨立的執行緒,負責輪詢多路複用器Selctor,可以處理多個客戶端的併發接入,現在我們繼續看MultiplexerTimeServer的原始碼:

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;

    /**
     * 初始化多路複用器、繫結監聽埠
     * 
     * @param port
     */
    public MultiplexerTimeServer(int port) {
	try {
	    selector = Selector.open();
	    servChannel = ServerSocketChannel.open();
	    servChannel.configureBlocking(false);
	    servChannel.socket().bind(new InetSocketAddress(port), 1024);
	    servChannel.register(selector, SelectionKey.OP_ACCEPT);
	    System.out.println("The time server is start in port : " + port);
	} catch (IOException e) {
	    e.printStackTrace();
	    System.exit(1);
	}
    }

    public void stop() {
	this.stop = true;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
	while (!stop) {
	    try {
		selector.select(1000);
		Set<SelectionKey> selectedKeys = selector.selectedKeys();
		Iterator<SelectionKey> it = selectedKeys.iterator();
		SelectionKey key = null;
		while (it.hasNext()) {
		    key = it.next();
		    it.remove();
		    try {
			handleInput(key);
		    } catch (Exception e) {
			if (key != null) {
			    key.cancel();
			    if (key.channel() != null)
				key.channel().close();
			}
		    }
		}
	    } catch (Throwable t) {
		t.printStackTrace();
	    }
	}

	// 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源
	if (selector != null)
	    try {
		selector.close();
	    } catch (IOException e) {
		e.printStackTrace();
	    }
    }

    private void handleInput(SelectionKey key) throws IOException {

	if (key.isValid()) {
	    // 處理新接入的請求訊息
	    if (key.isAcceptable()) {
		// Accept the new connection
		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
		SocketChannel sc = ssc.accept();
		sc.configureBlocking(false);
		// Add the new connection to the selector
		sc.register(selector, SelectionKey.OP_READ);
	    }
	    if (key.isReadable()) {
		// Read the data
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
		int readBytes = sc.read(readBuffer);
		if (readBytes > 0) {
		    readBuffer.flip();
		    byte[] bytes = new byte[readBuffer.remaining()];
		    readBuffer.get(bytes);
		    String body = new String(bytes, "UTF-8");
		    System.out.println("The time server receive order : "
			    + body);
		    String currentTime = "QUERY TIME ORDER"
			    .equalsIgnoreCase(body) ? new java.util.Date(
			    System.currentTimeMillis()).toString()
			    : "BAD ORDER";
		    doWrite(sc, currentTime);
		} else if (readBytes < 0) {
		    // 對端鏈路關閉
		    key.cancel();
		    sc.close();
		} else
		    ; // 讀到0位元組,忽略
	    }
	}
    }

    private void doWrite(SocketChannel channel, String response)
	    throws IOException {
	if (response != null && response.trim().length() > 0) {
	    byte[] bytes = response.getBytes();
	    ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
	    writeBuffer.put(bytes);
	    writeBuffer.flip();
	    channel.write(writeBuffer);
	}
    }
}

由於這個類相比於傳統的Socket程式設計稍微複雜一些,在此我們進行詳細分析,我們從如下幾個關鍵步驟講解多路複用處理類:

14-26行為構造方法,在構造方法中進行資源初始化,建立多路複用器Selector、ServerSocketChannel,對Channel和TCP引數進行配置,例如將ServerSocketChannel設定為非同步非阻塞模式,它的backlog設定為1024。系統資源初始化成功後將ServerSocketChannel註冊到Selector,監聽SelectionKey.OP_ACCEPT操作位;如果資源初始化失敗,例如埠被佔用則退出

39-61行線上程的run方法的while迴圈體中迴圈遍歷selector,它的休眠時間為1S,無論是否有讀寫等事件發生,selector每隔1S都被喚醒一次,selector也提供了一個無參的select方法。當有處於就緒狀態的Channel時,selector將返回就緒狀態的Channel的SelectionKey集合,我們通過對就緒狀態的Channel集合進行迭代,就可以進行網路的非同步讀寫操作

76-83行處理新接入的客戶端請求訊息,根據SelectionKey的操作位進行判斷即可獲知網路事件的型別,通過ServerSocketChannel的accept接收客戶端的連線請求並建立SocketChannel例項,完成上述操作後,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。注意,我們需要將新建立的SocketChannel設定為非同步非阻塞,同時也可以對其TCP引數進行設定,例如TCP接收和傳送緩衝區的大小等,作為入門的例子,例程沒有進行額外的引數設定

84-109行用於讀取客戶端的請求訊息,首先建立一個ByteBuffer,由於我們事先無法得知客戶端傳送的碼流大小,作為例程,我們開闢一個1M的緩衝區。然後呼叫SocketChannel的read方法讀取請求碼流,注意,由於我們已經將SocketChannel設定為非同步非阻塞模式,因此它的read是非阻塞的。使用返回值進行判斷,看讀取到的位元組數,返回值有三種可能的結果:

1)      返回值大於0:讀到了位元組,對位元組進行編解碼;

2)      返回值等於0:沒有讀取到位元組,屬於正常場景,忽略;

3)      返回值為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源。

當讀取到碼流以後,我們進行解碼,首先對readBuffer進行flip操作,它的作用是將緩衝區當前的limit設定為position,position設定為0,用於後續對緩衝區的讀取操作。然後根據緩衝區可讀的位元組個數建立位元組陣列,呼叫ByteBuffer的get操作將緩衝區可讀的位元組陣列拷貝到新建立的位元組陣列中,最後呼叫字串的建構函式建立請求訊息體並列印。如果請求指令是”QUERY TIME ORDER”則把伺服器的當前時間編碼後返回給客戶端,下面我們看看如果非同步傳送應答訊息給客戶端。

111-119行將應答訊息非同步傳送給客戶端,我們看下關鍵程式碼,首先將字串編碼成位元組陣列,根據位元組陣列的容量建立ByteBuffer,呼叫ByteBuffer的put操作將位元組陣列拷貝到緩衝區中,然後對緩衝區進行flip操作,最後呼叫SocketChannel的write方法將緩衝區中的位元組陣列傳送出去。需要指出的是,由於SocketChannel是非同步非阻塞的,它並不保證一次能夠把需要傳送的位元組陣列傳送完,此時會出現“寫半包”問題,我們需要註冊寫操作,不斷輪詢Selector將沒有傳送完的ByteBuffer傳送完畢,可以通過ByteBuffer的hasRemain()方法判斷訊息是否傳送完成。此處僅僅是個簡單的入門級例程,沒有演示如何處理“寫半包”場景,後續的章節會有詳細說明。

使用NIO建立TimeServer伺服器完成之後,我們繼續學習如何建立NIO客戶端。首先還是通過時序圖瞭解關鍵步驟和過程,然後結合程式碼進行詳細分析。