Java NIO 必知必會(Example)
管道流:
Java IO/">NIO 管道是2個執行緒之間的單向資料連線。Pipe有一個source通道和一個sink通道。資料會被寫到sink通道,從source通道讀取。
1 package base.nio.threaddemo; 2 3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.nio.channels.Pipe; 6 7 /** 8* @program: Lear-Java 9* @description: 10* @author: Mr.Dai 11* @create: 2018-10-05 20:43 12**/ 13 public class ThreadSend { 14 15private Pipe pipe; 16 17 18private void init() throws Exception { 19this.pipe = Pipe.open(); 20} 21 22 23class SendInner1 extends Thread { 24 25@Override 26public void run() { 27// 單向流 傳送資料 28try { 29Pipe.SinkChannel sink = pipe.sink(); 30sink.configureBlocking(false); 31 32while (true) { 33if (sink.isOpen()) { 34sink.write(ByteBuffer.wrap("abcd".getBytes())); 35} 36Thread.sleep(1000); 37} 38} catch (InterruptedException | IOException e) { 39e.printStackTrace(); 40} 41} 42} 43 44class ReverInner extends Thread { 45@Override 46public void run() { 47try { 48// 單向流 拿到資料 49Pipe.SourceChannel source = pipe.source(); 50 51source.configureBlocking(false); 52 53while (true) { 54if (source.isOpen()) { 55ByteBuffer buffer = ByteBuffer.allocate(10); 56buffer.clear(); 57source.read(buffer); 58// 這裡必須去掉 trim 59if(new String(buffer.array()).trim().equals("")){ 60continue; 61} 62System.out.println(new String(buffer.array()).trim()); 63} 64Thread.sleep(1000); 65} 66} catch (InterruptedException | IOException e) { 67e.printStackTrace(); 68} 69} 70} 71 72public static void main(String[] args) throws Exception { 73ThreadSend send = new ThreadSend(); 74 75send.init(); 76 77SendInner1 sendI = send.new SendInner1(); 78 79ReverInner revI = send.new ReverInner(); 80 81sendI.start(); 82revI.start(); 83} 84 85 86 }
套接字通道流
非阻塞模式
ServerSocketChannel可以設定成非阻塞模式。在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連線,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null。如:
1 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 2 3 serverSocketChannel.socket().bind(new InetSocketAddress(9999)); 4 serverSocketChannel.configureBlocking(false); 5 6while(true){ 7SocketChannel socketChannel = 8serverSocketChannel.accept(); 9 10if(socketChannel != null){ 11//do something with socketChannel... 12} 13 }
server:
1 package base.nio.chatdemo; 2 3 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.ServerSocketChannel; 9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11 import java.util.Set; 12 13 /** 14* @program: Lear-Java 15* @description: Nio 聊天服務端 16* @author: Mr.Dai 17* @create: 2018-10-05 16:31 18**/ 19 public class ChatServer { 20 21/** 22* 通道管理器 23*/ 24private Selector selector; 25 26private void initServer(int port) throws Exception{ 27 28ServerSocketChannel serverChannel= ServerSocketChannel.open(); 29 30serverChannel .socket().bind(new InetSocketAddress(port)); 31// 配置非阻塞 32serverChannel .configureBlocking(false); 33 34 35this.selector=Selector.open(); 36 37/** 38* 將通道管理器和該通道繫結,併為該通道註冊selectionKey.OP_ACCEPT事件 39* 註冊該事件後,當事件到達的時候,selector.select()會返回, 40* 如果事件沒有到達selector.select()會一直阻塞 41* selector.selectNow() 立即返回 無論是否準備好 可能返回0 42*/ 43serverChannel .register(this.selector, SelectionKey.OP_ACCEPT); 44 45} 46 47/** 48* 採用輪訓的方式監聽selector上是否有需要處理的事件,如果有,進行處理 49*/ 50public void listen() throws Exception { 51System.out.println("start------------------->"); 52while (true){ 53// 在沒有註冊事件來到時 將會一直阻塞 54selector.select(); 55Set<SelectionKey> set = selector.selectedKeys(); 56Iterator<SelectionKey> iterator = set.iterator(); 57 58while (iterator.hasNext()){ 59SelectionKey key = iterator.next(); 60// 移除當前阻塞佇列 61iterator.remove(); 62if(key.isAcceptable()){ 63ServerSocketChannel server = (ServerSocketChannel) key.channel(); 64 65SocketChannel channel = server.accept(); 66channel.configureBlocking(false); 67// 服務端傳送資料 68channel.write(ByteBuffer.wrap(new String("hello client").getBytes())); 69// 在客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權 70channel.register(this.selector,SelectionKey.OP_READ); 71 72}else if(key.isReadable()){ 73SocketChannel channel= (SocketChannel) key.channel(); 74 75ByteBuffer buffer = ByteBuffer.allocate(10); 76channel.read(buffer); 77 78String msg = new String(buffer.array()).trim(); 79 80System.out.println("客戶端傳送過來的訊息:"+msg); 81// 在讀取後 將柱塞佇列資料 改變監聽為Accept 82ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); 83channel.write(outBuffer); 84} 85} 86} 87 88} 89 90public static void main(String[] args)throws Exception{ 91ChatServer server = new ChatServer(); 92server.initServer(8989); 93server.listen(); 94} 95 96 }
clien:
1 package base.nio.chatdemo; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.SocketChannel; 9 import java.util.Iterator; 10 11 /** 12* @program: Lear-Java 13* @description: nio 聊天客戶端 14* @author: Mr.Dai 15* @create: 2018-10-05 16:31 16**/ 17 public class ChatClient { 18 19 20/** 21*提供柱阻塞佇列 管理器 22*/ 23private Selector selector; 24 25 26private void ininCliect(String ip,int port) throws Exception{ 27 28SocketChannel channel= SocketChannel.open(); 29 30channel .connect(new InetSocketAddress(ip,port)); 31 32this.selector=Selector.open(); 33 34channel .configureBlocking(false); 35 36 37channel .register(this.selector, SelectionKey.OP_CONNECT); 38 39} 40 41public void listen() throws Exception { 42 43while (true){ 44 45selector.select(); 46 47Iterator<SelectionKey> ite= selector.selectedKeys().iterator(); 48 49while (ite.hasNext()){ 50SelectionKey key = ite .next(); 51ite .remove(); 52if(key.isConnectable()){ 53SocketChannel channel = (SocketChannel) key.channel(); 54// 是否準備好連線 55if(channel.isConnectionPending()){ 56channel.finishConnect(); 57} 58channel.configureBlocking(false); 59// 向server 傳送資料 60channel.write(ByteBuffer.wrap("向server 傳送資料".getBytes())); 61 62channel.register(selector,SelectionKey.OP_READ); 63 64}else if(key.isReadable()){ 65m1(key); 66} 67} 68} 69} 70 71private void m1(SelectionKey key) throws IOException { 72SocketChannel channel = (SocketChannel) key.channel(); 73 74ByteBuffer buffer = ByteBuffer.allocate(10); 75channel.read(buffer); 76System.out.println("服務端的訊息為:"+new String(buffer.array())); 77 78ByteBuffer outBuffer = ByteBuffer.wrap(new String("aaa").getBytes()); 79channel.write(outBuffer); 80} 81 82public static void main(String[] args) throws Exception { 83ChatClient client = new ChatClient(); 84 85client.ininCliect("127.0.0.1",8989); 86client.listen(); 87} 88 89 }