1. 程式人生 > >Socket程式設計(二)(NIO)

Socket程式設計(二)(NIO)

    與Socket和ServerSocket類相對應,NIO也提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。這兩種新增的通道都支援阻塞和非阻塞兩種模式。

1.緩衝區Buffer

    Buffer是一個物件,它包含一些要寫入或讀出的資料。在NIO類庫加入Buffer物件,體現了新庫與原I/O的一個重要區別。在面向流的I/O中,可以將資料直接寫入或者將資料直接讀到Stream物件中。

    在NIO庫中所有資料都用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中。在寫入資料時,寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作。

    緩衝區實質上是一個數組。通常它是一個位元組陣列(ByteBuffer),也可以使用其他種類的陣列。但是一個緩衝區不僅僅是一個數組,緩衝區提供了對資料的結構化訪問以及維護讀寫位置等資訊。

    最常用的緩衝區是ByteBuffer,一個ByteBuffer提供了一組功能用於操作byte陣列。除了ByteBuffer還有其他緩衝區,除了Boolean型別每一種Java基本型別都對應一種緩衝區。

2.通道Channel

    Channel是一個通道,網路資料通過Channel讀取和寫入。通道與流不同之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或OutputStream的子類),而通道可以用於讀、寫或者兩者同時進行。因為Channel是雙全工的,所以它可以比流更好的對映底層作業系統的API。

    從類圖中可以看出Channel可以分為兩大類:用於網路讀寫的SelectableChannel和用於檔案操作的FileChannel。

3.多路複用器Selector

    多路複用器提供選擇已經就緒的任務的能力。簡單來說Selector會不斷地輪詢註冊其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

    一個多路複用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,所以它並沒有最大連線控制代碼1024/2048的限制。這也就意味著只需要一個執行緒負責Selector的輪詢就可以接入成千上萬的客戶端。

服務端程式碼:

//nio 時間伺服器 
public class TimeServer {
	public static void main(String[] args) {
		int port = 9999;
		MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
		new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
	}
}

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {
	private Selector selector;
	private ServerSocketChannel servChannel;
	private volatile boolean stop;

	public MultiplexerTimeServer(int port) {
		try {
			selector = Selector.open();
			servChannel = ServerSocketChannel.open();
			servChannel.configureBlocking(false);
			servChannel.socket().bind(new InetSocketAddress(port));
			servChannel.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("時間伺服器在埠啟動:" + port);
		} catch (IOException e) {
			e.printStackTrace();
			System.exit(1);
		}
	}

	public void stop() {
		this.stop = true;
	}

	@Override
	public void run() {
		while (!stop) {
			try {
				selector.select(1000);
				Set<SelectionKey> selectedKeys = selector.selectedKeys();
				Iterator<SelectionKey> it = selectedKeys.iterator();
				SelectionKey key = null;
				while (it.hasNext()) {
					key = it.next();
					it.remove();
					try {
						handleInput(key);
					} catch (Exception e) {
						if (key != null) {
							key.cancel();
							if (key.channel() != null) {
								key.channel().close();
							}
						}
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if (selector != null) {
			try {
				System.out.println("選擇器不為空");
				selector.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private void handleInput(SelectionKey key) throws IOException {
		if (key.isValid()) {
			// 判斷是否是個有效的控制代碼
			if (key.isAcceptable()) { 
				// 接受請求
				ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
				SocketChannel sc = ssc.accept();
				sc.configureBlocking(false); 
				// 新增新連線到 selector
				sc.register(selector, SelectionKey.OP_READ);
			}
			if (key.isReadable()) { 
				// 讀取資料
				SocketChannel sc = (SocketChannel) key.channel();
				ByteBuffer readBuffer = ByteBuffer.allocate(1024);// 為ByteBuffer分配空間大小(位於 jvm) 
				//ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);// 為ByteBuffer分配空間大小(基於作業系統)
				int readBytes = sc.read(readBuffer);
				if (readBytes > 0) {
					readBuffer.flip();
					// 將快取位元組陣列的指標設定為陣列的開始序列即陣列下標0
					byte[] bytes = new byte[readBuffer.remaining()];
					// 返回剩餘的可用長度,此長度為實際讀取的資料長度,最大自然是底層陣列的長度
					readBuffer.get(bytes);
					String body = new String(bytes, "UTF-8");
					System.out.println("時間伺服器接收資料 : " + body);
					String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
							? new Date(System.currentTimeMillis()).toString()
							: "BAD ORDER";
					doWrite(sc, currentTime);
				} else if (readBytes < 0) {
					// 對端鏈路關閉
					key.cancel();
					sc.close();
				} else { 
					// 讀取到 0 位元組,忽略
				}
			}
		}
	}

	private void doWrite(SocketChannel channel, String response) throws IOException {
		if (response != null && response.trim().length() > 0) {
			byte[] bytes = response.getBytes();
			ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
			writeBuffer.put(bytes);
			writeBuffer.flip();
			// 將position 設定為0
			channel.write(writeBuffer);
			writeBuffer.clear();
		}
	}
}

程式碼分析:

1.首先會在構造方法中進行資源初始化,建立多路複用器Selector、ServerSocketChannel,對Channel和TCP引數進行配置。如,將ServerSocketChannel設定為非同步非阻塞模式,它的backlog設為1024,系統資源初始化成功後,將ServerSocketChannel註冊到Selector,監聽SelectionKey.OP_ACCEPT操作位。如果資源初始化失敗,則退出。

2.在run方法的while迴圈體中迴圈遍歷selector,設定休眠時間為1s。無論是否有讀寫事件發生,selector每隔1s都被喚醒一次。selector也有一個無參的select方法:當有處於就緒狀態的Channel時,selector將返回該Channel的SelectionKey集合。通過對就緒狀態的Channel集合進行迭代,可以進行網路的非同步讀寫操作。

3.當有客戶端接入時,根據SelectionKey的操作位進行判斷獲知網路事件的型別,通過ServerSocketChannel的accept接受客戶端的連線請求並建立SocketChannel例項。完成上述操作後,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。然後將新建立的SocketChannel設定成非同步非阻塞。

4.然後就是讀取客戶端請求,首先建立一個ByteBuffer,然後呼叫SocketChannel的read方法讀取請求碼流。因為此時已經把SocketChannel設定為非同步非阻塞模式,因此它的read是非阻塞的。所以要將返回值進行判斷:

1.返回值大於0:讀取到位元組,對位元組進行編解碼。

2.返回值等於0:沒有讀取到位元組,忽略;

3.返回值小於0:鏈路已經關閉,需要關閉SocketChannel釋放資源。

當讀取到碼流後,進行解碼。首先對readBuffer進行flip操作,它的作用是將緩衝區當前的limit設定為position,position設定為0,用於後續對緩衝區進行讀取操作。然後根據緩衝區可讀位元組個數建立位元組陣列,呼叫ByteBuffer的get操作將緩衝區可讀的位元組陣列複製到新建立的位元組陣列中,最後呼叫字串構造將其打印出來。如果請求的字串是“QUERY TIME ORDER”則把伺服器的當前時間編碼後返回給客戶端。

5.在讀取完資料後會呼叫doWrite方法,首先他會將字串編碼成位元組陣列,根據位元組陣列的容量建立ByteBuffer,呼叫ByteBuffer的put操作將位元組陣列複製到緩衝區中,然後對緩衝區進行flip操作,最後呼叫SocketChannel的write方法將緩衝區中的位元組陣列傳送出去。由於SocketChannel是非同步非阻塞的,他並不保證一次能夠把需要傳送的位元組陣列傳送完,此時會出現“寫半包”問題。我們需要註冊寫操作,不斷輪詢Selector將沒有傳送完的ByteBuffer傳送完畢,然後通過ByteBuffer的hasRemain()方法判斷訊息是否傳送完成。

客戶端程式碼:,

public class TimeClient {
	public static void main(String[] args) {
		int port = 9999;
		new Thread(new TimeClientHandler("127.0.0.1", port)).start();
	}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandler implements Runnable {
	private int port;
	private String host;
	private Selector selector;
	private SocketChannel channel;
	private volatile boolean stop;

	public TimeClientHandler(String host, int port) {
		this.host = host == null ? "127.0.0.1" : host;
		this.port = port;
		try {
			selector = Selector.open();
			channel = SocketChannel.open();
			channel.configureBlocking(false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		try {
			doConnect();
		} catch (IOException e1) {
			e1.printStackTrace();
			System.exit(1);
		}
		while (!stop) {
			try {
				// selector每一秒被喚醒一次
				selector.select(1000);
				// 還回就緒狀態的chanel的 selectedKeys
				Set<SelectionKey> selectedKeys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = selectedKeys.iterator();
				SelectionKey key = null;
				while (iterator.hasNext()) {
					key = iterator.next();
					iterator.remove();
					try {
						handleInput(key);
					} catch (Exception e) {
						if (key != null) {
							key.cancel();
							if (key.channel() != null)
								key.channel().close();
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		if (selector != null) {
			try {
				selector.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			selector = null;
		}
	}

	public void handleInput(SelectionKey key) throws IOException {
		if (key.isValid()) {
			SocketChannel sc = (SocketChannel) key.channel();
			if (key.isConnectable()) {
				if (sc.finishConnect()) {
					sc.register(selector, SelectionKey.OP_READ);
					doWrite(sc);
				} else {
					System.exit(1);// 連線失敗,程序退出
				}
			}
			if (key.isReadable()) {
				ByteBuffer readBuffer = ByteBuffer.allocate(1024);
				int readBytes = sc.read(readBuffer);
				if (readBytes > 0) {
					readBuffer.flip();
					byte[] bytes = new byte[readBuffer.remaining()];
					readBuffer.get(bytes);
					String body = new String(bytes, "UTF-8");
					System.out.println("當前時間 : " + body);
					this.stop = true;
				} else if (readBytes < 0) { // 對端鏈路關閉
					key.cancel();
					sc.close();
				} else
					; // 讀到0位元組,忽略
			}
		}
	}

	private void doConnect() throws IOException {
		if (channel.connect(new InetSocketAddress(host, port))) {
			channel.register(selector, SelectionKey.OP_READ);
			doWrite(channel);
		} else {
			channel.register(selector, SelectionKey.OP_CONNECT);
		}
	}

	private void doWrite(SocketChannel schannel) throws IOException {
		byte[] bytes = "QUERY TIME ORDER".getBytes();
		ByteBuffer buff = ByteBuffer.allocate(bytes.length);
		buff.put(bytes);
		buff.flip();
		schannel.write(buff); // 判斷是否傳送完畢
		if (!buff.hasRemaining()) {
			System.out.println("傳送成功!");
		}
	}
}

1.首先初始化NIO的多路複用器和SocketChannel物件。需要注意的是建立SocketChannel之後,需要將其設定為非同步非阻塞模式。

2.doConnect()函式用於傳送連線請求,首先對SocketChannel的connect()操作進行判斷,如果成功則將SocketChannel註冊到多路複用器Selector上,註冊SelectionKey.OP_READ;如果沒有沒有成功則說明服務端沒有返回TCP握手應答訊息,但所以此時需要將SocketChannel註冊到多路複用器Selector上,註冊SelectionKey.OP_CONNECT,當服務端返回TCP syn-ack訊息後,Selector就能輪詢到這個SocketChannel處於連線就緒狀態。

3.在執行完doConnect()後,通過迴圈不斷輪詢多路複用器Selector。當有就緒的Channel時,執行handleInput()方法。

4.首先對SelectionKey進行判斷,判斷其屬於什麼狀態,如果處於連線則說明服務端已返回ACK應答訊息。這時我們需要對連線結果進行判斷,呼叫SocketChannel的finishConnect()方法。如果返回值為true,說明客戶端連線成功;如果返回false或者丟擲IOException說明連線失敗。在這裡返回值為true,說明連線成功。將SocketChannel註冊到多路複用器上,註冊SelectionKey.OP_READ操作位,監聽網路讀操作,然後將請求訊息給服務端。

此時會呼叫doWrite()方法,這裡構造訊息體然後將其編碼寫入到傳送緩衝區中,最後呼叫SocketChannel的write方法進行傳送。

5.如果客戶端接收到了服務端的應答訊息,則SocketChannel是可讀的,這裡我分配1M進行讀取應答訊息,呼叫SocketChannel的read()方法進行非同步讀取操作。如果讀取到了訊息則對訊息進行解碼最後輸出,然後將stop設為true,執行緒退出迴圈。

6.執行緒退出迴圈後,需要對資源進行釋放。

NIO優點總結:

1.客戶端發起的連線操作是非同步的,可以通過在多路複用器註冊OP_CONNECT等待後續結果,不需要像之前的客戶端那樣被同步阻塞。

2.SocketChannel的讀寫操作都是非同步的,如果沒有可讀寫的資料它不會同步等待,直接返回,這樣I/O通訊執行緒就可以處理其他的鏈路,不需要同步等待這個鏈路可用。

3.執行緒模型的優化:由於JDK的Selector在Linux等主流作業系統上通過epoll實現,它沒有連線控制代碼數的限制,這意味著一個Selector執行緒可以同時處理成千上萬個客戶端連線,而且效能不會隨著客戶端的增加而線性下降。因此它非常適合做高效能、高負載的網路伺服器。