1. 程式人生 > >Java NIO理解與使用

Java NIO理解與使用

Netty的使用或許我們看著官網user guide還是很容易入門的。因為java nio使用非常的繁瑣,netty對java nio進行了大量的封裝。對於Netty的理解,我們首先需要了解NIO的原理和使用。所以,我也特別渴望去了解NIO這種通訊模式。

官方的定義是:nio 是non-blocking的簡稱,在jdk1.4 裡提供的新api 。Sun 官方標榜的特性如下: 為所有的原始型別提供(Buffer)快取支援。字符集編碼解碼解決方案。 Channel :一個新的原始I/O 抽象。 支援鎖和記憶體對映檔案的檔案訪問介面。 提供多路(non-bloking) 非阻塞式的高伸縮性網路I/O 。是不是很抽象?

在閱讀《NIO入門》這篇技術文件之後,收穫了很多。包括對Java NIO的理解和使用,所以也特別的感謝作者。

首先,還是來回顧以下從這篇文件中學到的要點。

為什麼要使用 NIO?
NIO 的建立目的是為了讓 Java 程式設計師可以實現高速 I/O 而無需編寫自定義的本機程式碼。NIO 將最耗時的 I/O 操作(即填充和提取緩衝區)轉移回作業系統,因而可以極大地提高速度。

NIO最重要的組成部分

通道 Channels
緩衝區 Buffers
選擇器 Selectors

Buffer 是一個物件, 它包含一些要寫入或者剛讀出的資料。

在 NIO 庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的。在寫入資料時,它是寫入到緩衝區中的。任何時候訪問 NIO 中的資料,您都是將它放到緩衝區中。
緩衝區實質上是一個數組。通常它是一個位元組陣列,但是也可以使用其他種類的陣列。但是一個緩衝區不 僅僅 是一個數組。緩衝區提供了對資料的結構化訪問,而且還可以跟蹤系統的讀/寫程序。

Channel是一個物件,可以通過它讀取和寫入資料

看完下面這個例子,基本上就理解buffer和channel的作用了

package yyf.java.nio.ibm;

import java.io.*;
import java.nio.*;
import java.nio.channels.*;

public class CopyFile {
	static public void main(String args[]) throws Exception {

		String infile = "c://test/nio_copy.txt";
		String outfile = "c://test/result.txt";

		FileInputStream fin = new FileInputStream(infile);
		FileOutputStream fout = new FileOutputStream(outfile);
		// 獲取讀的通道
		FileChannel fcin = fin.getChannel();
		// 獲取寫的通道
		FileChannel fcout = fout.getChannel();
		// 定義緩衝區,並指定大小
		ByteBuffer buffer = ByteBuffer.allocate(1024);

		while (true) {
			// 清空緩衝區
			buffer.clear();
			//從通道讀取一個數據到緩衝區
			int r = fcin.read(buffer);
			//判斷是否有從通道讀到資料
			if (r == -1) {
				break;
			}
			//將buffer指標指向頭部
			buffer.flip();
			//把緩衝區資料寫入通道
			fcout.write(buffer);
		}
	}
}

緩衝區主要是三個變數

position
limit
capacity
這三個變數一起可以跟蹤緩衝區的狀態和它所包含的資料。我們將在下面的小節中詳細分析每一個變數,還要介紹它們如何適應典型的讀/寫(輸入/輸出)程序。在這個例子中,我們假定要將資料從一個輸入通道拷貝到一個輸出通道。
Position
您可以回想一下,緩衝區實際上就是美化了的陣列。在從通道讀取時,您將所讀取的資料放到底層的陣列中。 position 變數跟蹤已經寫了多少資料。更準確地說,它指定了下一個位元組將放到陣列的哪一個元素中。因此,如果您從通道中讀三個位元組到緩衝區中,那麼緩衝區的 position 將會設定為3,指向陣列中第四個元素。
同樣,在寫入通道時,您是從緩衝區中獲取資料。 position 值跟蹤從緩衝區中獲取了多少資料。更準確地說,它指定下一個位元組來自陣列的哪一個元素。因此如果從緩衝區寫了5個位元組到通道中,那麼緩衝區的 position 將被設定為5,指向陣列的第六個元素。
Limit
limit 變量表明還有多少資料需要取出(在從緩衝區寫入通道時),或者還有多少空間可以放入資料(在從通道讀入緩衝區時)。
position 總是小於或者等於 limit。
Capacity
緩衝區的 capacity 表明可以儲存在緩衝區中的最大資料容量。實際上,它指定了底層陣列的大小 ― 或者至少是指定了准許我們使用的底層陣列的容量。
limit 決不能大於 capacity。

緩衝區作為一個數組,這三個變數就是其中資料的標記,也很好理解。

Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,並能夠知曉通道是否為諸如讀寫事件做好準備的元件。這樣,一個單獨的執行緒可以管理多個channel,從而管理多個網路連線。

接下來來看看具體的使用把,我建立了一個直接收訊息的伺服器(一邊接收一邊寫資料可能對於新手不好理解)

服務端:

package yyf.java.nio.test;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NioReceiver {
	@SuppressWarnings("null")
	public static void main(String[] args) throws Exception {
		ByteBuffer echoBuffer = ByteBuffer.allocate(8);
		ServerSocketChannel ssc = ServerSocketChannel.open();
		Selector selector = Selector.open();
		ssc.configureBlocking(false);
		ServerSocket ss = ssc.socket();
		InetSocketAddress address = new InetSocketAddress(8080);
		ss.bind(address);
		SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
		System.out.println("開始監聽……");
		while (true) {
			int num = selector.select();
			Set selectedKeys = selector.selectedKeys();
			Iterator it = selectedKeys.iterator();
			while (it.hasNext()) {
				SelectionKey sKey = (SelectionKey) it.next();
				SocketChannel channel = null;
				if (sKey.isAcceptable()) {
					ServerSocketChannel sc = (ServerSocketChannel) key.channel();
					channel = sc.accept();// 接受連線請求
					channel.configureBlocking(false);
					channel.register(selector, SelectionKey.OP_READ);
					it.remove();
				} else if (sKey.isReadable()) {
					channel = (SocketChannel) sKey.channel();
					while (true) {
						echoBuffer.clear();
						int r = channel.read(echoBuffer);
						if (r <= 0) {
							channel.close();
							System.out.println("接收完畢,斷開連線");
							break;
						}
						System.out.println("##" + r + " " + new String(echoBuffer.array(), 0, echoBuffer.position()));
						echoBuffer.flip();
					}
					it.remove();
				} else {
					channel.close();
				}
			}
		}

	}

}

客戶端(NIO):
package yyf.java.nio.test;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NioTest {
	public static void main(String[] args) throws Exception {
		ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
		SocketChannel channel = null;
		Selector selector = null;
		channel = SocketChannel.open();
		channel.configureBlocking(false);
		// 請求連線
		channel.connect(new InetSocketAddress("localhost", 8080));
		selector = Selector.open();
		channel.register(selector, SelectionKey.OP_CONNECT);
		int num = selector.select();
		Set selectedKeys = selector.selectedKeys();
		Iterator it = selectedKeys.iterator();
		while (it.hasNext()) {
			SelectionKey key = (SelectionKey) it.next();
			it.remove();
			if (key.isConnectable()) {
				if (channel.isConnectionPending()) {
					if (channel.finishConnect()) {
						// 只有當連線成功後才能註冊OP_READ事件
						key.interestOps(SelectionKey.OP_READ);
						echoBuffer.put("123456789abcdefghijklmnopq".getBytes());
						echoBuffer.flip();
						System.out.println("##" + new String(echoBuffer.array()));
						channel.write(echoBuffer);
						System.out.println("寫入完畢");
					} else {
						key.cancel();
					}
				}
			}
		}

	}
}

執行結果:

開始監聽……
##8 12345678
##8 9abcdefg
##8 hijklmno
##2 pq
接收完畢,斷開連線


當然,BIO的客戶端也可以,開啟10個BIO客戶端執行緒

package yyf.java.nio.test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;

import yyf.java.test.Main;

public class BioClientTest {
	public static void main(String[] args) throws Exception {
		BioClient n = new BioClient();
		for (int i = 0; i < 10; i++) {
			Thread t1 = new Thread(n);
			t1.start();
		}
	}
}

class BioClient implements Runnable {
	@Override
	public void run() {

		try {
			Socket socket = new Socket("127.0.0.1", 8080);
			OutputStream os = socket.getOutputStream();
			InputStream is = socket.getInputStream();
			ByteArrayOutputStream bos = new ByteArrayOutputStream();
			String str = Thread.currentThread().getName() + "...........sadsadasJava";
			os.write(str.getBytes());
			StringBuffer sb = new StringBuffer();
			byte[] b = new byte[1024];
			int len;
			while ((len = is.read(b)) != -1) {
				bos.write(b, 0, len);
			}
			is.close();
			os.close();
			socket.close();
			System.out.println(Thread.currentThread().getName() + " 寫入完畢 " + new String(bos.toByteArray()));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
執行結果:
##8 Thread-4
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-3
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-9
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-7
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-0
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-5
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-2
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-8
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-1
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線
##8 Thread-6
##8 ........
##8 ...sadsa
##7 dasJava
接收完畢,斷開連線

當然,這只是一個測試,對於一個伺服器,是有讀取,也有寫出的,這是文件給的一個服務端例子
package yyf.java.nio.ibm;

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class MultiPortEcho {
	private int ports[];
	private ByteBuffer echoBuffer = ByteBuffer.allocate(5);

	public MultiPortEcho(int ports[]) throws IOException {
		this.ports = ports;
		go();
	}

	private void go() throws IOException {
		Selector selector = Selector.open();
		for (int i = 0; i < ports.length; ++i) {
			ServerSocketChannel ssc = ServerSocketChannel.open();
			ssc.configureBlocking(false);
			ServerSocket ss = ssc.socket();
			InetSocketAddress address = new InetSocketAddress(ports[i]);
			ss.bind(address);
			SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("Going to listen on " + ports[i]);
		}

		while (true) {
			int num = selector.select();
			Set selectedKeys = selector.selectedKeys();
			Iterator it = selectedKeys.iterator();
			while (it.hasNext()) {
				SelectionKey key = (SelectionKey) it.next();
				if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
					ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
					SocketChannel sc = ssc.accept();
					sc.configureBlocking(false);
					SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
					it.remove();
					System.out.println("Got connection from " + sc);
				} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
					SocketChannel sc = (SocketChannel) key.channel();
					int bytesEchoed = 0;
					while (true) {
						echoBuffer.clear();
						int r = sc.read(echoBuffer);
						if (r <= 0) {
							sc.close();
							break;
						}
						echoBuffer.flip();
						sc.write(echoBuffer);
						bytesEchoed += r;
					}
					System.out.println("Echoed " + bytesEchoed + " from " + sc);
					it.remove();
				}

			}
			// System.out.println( "going to clear" );
			// selectedKeys.clear();
			// System.out.println( "cleared" );
		}
	}

	static public void main(String args[]) throws Exception {
		int ports[] = new int[] { 8080 };
		for (int i = 0; i < args.length; ++i) {
			ports[i] = Integer.parseInt(args[i]);
		}
		new MultiPortEcho(ports);
	}
}

現在,我們就寫個客戶端去跟伺服器通訊,把發過去的返回來:
package yyf.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import javax.swing.ButtonGroup;

public class NioClient {
	public static void main(String[] args) throws IOException {
		SocketChannel socketChannel = SocketChannel.open();
		SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8080);
		socketChannel.connect(socketAddress);
		String str = "你好a";
		ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
		socketChannel.write(buffer);
		socketChannel.socket().shutdownOutput();
	
		buffer.clear();
		byte[] bytes;
		int count = 0;
		while ((count = socketChannel.read(buffer)) > 0) {
			buffer.flip();
			bytes = new byte[count];
			buffer.get(bytes);
			System.out.println(new String(buffer.array()));
			buffer.clear();
		}
		socketChannel.socket().shutdownInput();
		socketChannel.socket().close();
		socketChannel.close();
	}
}


執行結果

server:

Going to listen on 8080
Got connection from java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:63584]
Echoed 7 from java.nio.channels.SocketChannel[closed]

client:
你好a

對於NIO的入門就先到這裡。