java Nio 使用 NioSocket 客戶端與服務端互動實現
NioSocket 客戶端與服務端互動實現
java Nio是jdk1.4新增的io方式—–nio(new IO),這種方式在目前來說算不算new,更合適的解釋應該是non-block IO。
non-block是相對於傳統的io方式來講的。傳統的Io方式是阻塞的,我們拿網路io來舉例,傳統的io模型如下:
服務端主執行緒負責不斷地server.accept(),如果沒有客戶端請求主執行緒就會阻塞,當客戶端請求時,主執行緒會通過執行緒池建立一個新的執行緒執行。簡單解釋就是一個執行緒負責一個客戶端的socket,當客戶端因網路等原因傳遞速度慢的時候,服務端對應的客戶端的執行緒就會等待,很浪費資源。同時執行緒過少的話會影響服務的吞吐量,而執行緒過多的話由於上下文切換等原因會導致效率十分低下,傳統的io方式並不適合如今的網路流量。
Nio的模型如下:
nio相比傳統的io模型,最大的特點是優化了執行緒的使用。nio通過selector可以使用一個執行緒去管理多個socket控制代碼,說是管理也不太合適,nio是採用的事件驅動模型,selector負責的是監控各個連線控制代碼的狀態,不是去輪詢每個控制代碼,而是在資料就緒後,將訊息通知給selector,而具體的socket控制代碼管理則是採用多路複用的模型,交由作業系統來完成。selector充當的是一個訊息的監聽者,負責監聽channel在其註冊的事件,這樣就可以通過一個執行緒完成了大量連線的管理,當註冊的事件發生後,再呼叫相應執行緒進行處理。這樣就不需要為每個連線都使用一個執行緒去維持長連線,減少了長連線的開銷,同時減少了上下文的切換提高了系統的吞吐量。
java Nio的組成
java Nio主要由三個核心部分組成:
- Buffer
- Channel
- Selector
所有的io的Nio都是從一個channel開始的,Channel有點類似於流,但是和流不同的是,channel是可以雙向讀寫的。Channel有幾種型別,主要包含檔案io操作和網路io:
- FileChannel (檔案io)
- DatagramChannel (udp資料報)
- SocketChannel (tcp客戶端)
- ServerSocketChannel (tcp服務端)
Buffer是一箇中間快取區,資料可以從channel讀取到buffer,也可以從buffer寫到channel中,在java中,傳統方式與io的互動,需要將資料從堆記憶體讀取到直接記憶體中,然後交由c語言來呼叫系統服務完成io的互動。而使用Buffer可以直接在直接記憶體中開闢記憶體區域,減少了io複製的操作,從而提高了io操作的效率。
#基本資料型別的buffer
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
#檔案記憶體對映buffer
- MappedByteBuffer
#直接記憶體區buffer
- DirectBuffer
Selector允許單個執行緒處理多個channel,可以將多個channel教給selector管理,並註冊相應的事件,而selector則採用事件驅動的方式,當註冊的事件就緒後,呼叫相應的相應的執行緒處理該時間,不用使用執行緒去維持長連線,減少了執行緒的開銷。Selector通過靜態工廠的open方法建立,然後通過channel的register註冊到Channel上。註冊後通過select方法等待請求,select請求有long型別引數,代表等待時間,如果等待時間內接受到操作請求,則返回可以操作請求的數量,否則超時往下走。傳入引數為零或者無參方法,則會採用阻塞模式知道有相應請求。收到請求後呼叫selectedKeys返回SelectionKey的集合。SelectionKey儲存了處理當前請求的Channel和Selector,並且提供了不同的操作型別。SelectionKey的操作有四種:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
下面為一個客戶端與服務端實用NioSocket互動的簡單例子:
//對selectionKey事件的處理
/**
* description:
*
* @author wkGui
*/
interface ServerHandlerBs {
void handleAccept(SelectionKey selectionKey) throws IOException;
String handleRead(SelectionKey selectionKey) throws IOException;
}
/**
* description:
*
* @author wkGui
*/
public class ServerHandlerImpl implements ServerHandlerBs {
private int bufferSize = 1024;
private String localCharset = "UTF-8";
public ServerHandlerImpl() {
}
public ServerHandlerImpl(int bufferSize) {
this(bufferSize, null);
}
public ServerHandlerImpl(String localCharset) {
this(-1, localCharset);
}
public ServerHandlerImpl(int bufferSize, String localCharset) {
this.bufferSize = bufferSize > 0 ? bufferSize : this.bufferSize;
this.localCharset = localCharset == null ? this.localCharset : localCharset;
}
@Override
public void handleAccept(SelectionKey selectionKey) throws IOException {
//獲取channel
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
//非阻塞
socketChannel.configureBlocking(false);
//註冊selector
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
System.out.println("建立請求......");
}
@Override
public String handleRead(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
String receivedStr = "";
if (socketChannel.read(buffer) == -1) {
//沒讀到內容關閉
socketChannel.shutdownOutput();
socketChannel.shutdownInput();
socketChannel.close();
System.out.println("連線斷開......");
} else {
//將channel改為讀取狀態
buffer.flip();
//按照編碼讀取資料
receivedStr = Charset.forName(localCharset).newDecoder().decode(buffer).toString();
buffer.clear();
//返回資料給客戶端
buffer = buffer.put(("received string : " + receivedStr).getBytes(localCharset));
//讀取模式
buffer.flip();
socketChannel.write(buffer);
//註冊selector 繼續讀取資料
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
}
return receivedStr;
}
}
//服務端server類
/**
* description:
*
* @author wkGui
*/
public class NioSocketServer {
private volatile byte flag = 1;
public void setFlag(byte flag) {
this.flag = flag;
}
public void start() {
//建立serverSocketChannel,監聽8888埠
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//設定為非阻塞模式
serverSocketChannel.configureBlocking(false);
//為serverChannel註冊selector
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務端開始工作:");
//建立訊息處理器
ServerHandlerBs handler = new ServerHandlerImpl(1024);
while (flag == 1) {
selector.select();
System.out.println("開始處理請求 : ");
//獲取selectionKeys並處理
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
try {
//連線請求
if (key.isAcceptable()) {
handler.handleAccept(key);
}
//讀請求
if (key.isReadable()) {
System.out.println(handler.handleRead(key));
}
} catch (IOException e) {
e.printStackTrace();
}
//處理完後移除當前使用的key
keyIterator.remove();
}
System.out.println("完成請求處理。");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
//server端啟動類
/**
* description:
*
* @author wkGui
*/
public class ServerMain {
public static void main(String[] args) {
NioSocketServer server = new NioSocketServer();
new Thread(() -> {
try {
Thread.sleep(10*60*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
server.setFlag((byte) 0);
}
}).start();
server.start();
}
}
//客戶端client類
/**
* description:
*
* @author wkGui
*/
public class NioSocketClient {
public void start() {
try (SocketChannel socketChannel = SocketChannel.open()) {
//連線服務端socket
SocketAddress socketAddress = new InetSocketAddress("localhost", 8888);
socketChannel.connect(socketAddress);
int sendCount = 0;
ByteBuffer buffer = ByteBuffer.allocate(1024);
//這裡最好使用selector處理 這裡只是為了寫的簡單
while (sendCount < 10) {
buffer.clear();
//向服務端傳送訊息
buffer.put(("current time : " + System.currentTimeMillis()).getBytes());
//讀取模式
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
//從服務端讀取訊息
int readLenth = socketChannel.read(buffer);
//讀取模式
buffer.flip();
byte[] bytes = new byte[readLenth];
buffer.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
buffer.clear();
sendCount++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
//client啟動類
/**
* description:
*
* @author wkGui
*/
public class ClientMain {
public static void main(String[] args) {
new NioSocketClient().start();
}
}