1. 程式人生 > >分散式專題(七)NIO

分散式專題(七)NIO

netty 和 mina是對NIO進行封裝的框架,所以學習netty之前先了解NIO

具體參考文件

連結:https://pan.baidu.com/s/1sCjacdyC3fYN3SBubgCiuw 
提取碼:osim

 

1)阻塞(Block)和非阻塞(Non-Block):
阻塞和非阻塞是程序在訪問資料的時候, 資料是否準備就緒的一種處理方式,當資料沒
有準備的時候 阻塞: 往往需要等待緩衝區中的資料準備好過後才處理其他的事情, 否則一
直等待在那裡。
非阻塞:當我們的程序訪問我們的資料緩衝區的時候, 如果資料沒有準備好則直接返回,
不會等待。 如果資料已經準備好, 也直接返回。
2) 同步(Synchronization)和非同步(Asynchronous)的方式:
同步和非同步都是基於應用程式和作業系統處理 IO 事件所採用的方式。 比如同步: 是應
用程式要直接參與 IO 讀寫的操作。 非同步: 所有的 IO 讀寫交給作業系統去處理, 應用程式只
需要等待通知。
同步方式在處理 IO 事件的時候, 必須阻塞在某個方法上面等待我們的 IO 事件完成(阻
塞 IO 事件或者通過輪詢 IO 事件的方式),對於非同步來說, 所有的 IO 讀寫都交給了作業系統。
這個時候, 我們可以去做其他的事情, 並不需要去完成真正的 IO 操作, 當操作完成 IO 後,
會給我們的應用程式一個通知。
同步:1)阻塞到 IO 事件, 阻塞到 read 或則 write。 這個時候我們就完全不能做自己的
事情。 讓讀寫方法加入到執行緒裡面, 然後阻塞執行緒來實現, 對執行緒的效能開銷比較大。
 

 

nio:同步非阻塞

 

面向流和緩衝區

Java NIO 和 IO 之間第一個最大的區別是, IO 是面向流的, NIO 是面向緩衝區的

 

阻塞與非阻塞
Java IO 的各種流是阻塞的。 這意味著, 當一個執行緒呼叫 read() 或 write()時, 該執行緒被阻塞, 直到有一些資料被讀取, 或資料完全寫入。該執行緒在此期間不能再幹任何事情了
JavaNIO 的非阻塞模式,一個執行緒請求寫入一些資料到某通道, 但不需要等待它完全寫入, 這個執行緒同時可以去做別的事情。執行緒通常將非阻塞 IO 的空閒時間用於在其它通道上執行 IO 操作, 所以一個單獨的執行緒現在可以管理多個輸入和輸出通道(channel)
 

選擇器

Java NIO 的選擇器允許一個單獨的執行緒來監視多個輸入通道, 你可以註冊多個通道使用一個選擇器, 然後使用一個單獨的執行緒來“選擇” 通道: 這些通道里已經有可以處理的輸入,或者選擇已準備寫入的通道。 這種選擇機制, 使得一個單獨的執行緒很容易來管理多個通道。
 

1.BIO寫法

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class BIOServer {

    ServerSocket server;
	//伺服器
	public BIOServer(int port){
		try {
			//把Socket服務端啟動
			server = new ServerSocket(port);
			System.out.println("BIO服務已啟動,監聽埠是:" + port);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 開始監聽,並處理邏輯
	 * @throws IOException 
	 */
	public void listener() throws IOException{
		//死迴圈監聽
		while(true){
			//雖然寫了一個死迴圈,如果一直沒有客戶端連線的話,這裡一直不會往下執行
			Socket client = server.accept();//等待客戶端連線,阻塞方法
			
			//拿到輸入流,也就是鄉村公路
			InputStream is = client.getInputStream();
			
			//緩衝區,陣列而已
			byte [] buff = new byte[1024];
			int len = is.read(buff);
			//只要一直有資料寫入,len就會一直大於0
			if(len > 0){
				String msg = new String(buff,0,len);
				System.out.println("收到" + msg);
			}
		}
	}
	
	
	public static void main(String[] args) throws IOException {
		new BIOServer(8080).listener();
	}
	
}
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

public class BIOClient2 {

	public static void main(String[] args) throws UnknownHostException, IOException {

		try{


            //開一條鄉村公路
			Socket client = new Socket("localhost", 8080);
			//輸出流通道開啟
			OutputStream os = client.getOutputStream();
			//產生一個隨機的字串,UUID
			String name = UUID.randomUUID().toString();
			
			//傳送給服務端
			os.write(name.getBytes());
			os.close();
			client.close();
			
		}catch(Exception e){
			
		}
	}
		
	
}

2.NIO寫法

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/**
 * 網路多客戶端聊天室
 * 功能1: 客戶端通過Java NIO連線到服務端,支援多客戶端的連線
 * 功能2:客戶端初次連線時,服務端提示輸入暱稱,如果暱稱已經有人使用,提示重新輸入,如果暱稱唯一,則登入成功,之後傳送訊息都需要按照規定格式帶著暱稱傳送訊息
 * 功能3:客戶端登入後,傳送已經設定好的歡迎資訊和線上人數給客戶端,並且通知其他客戶端該客戶端上線
 * 功能4:伺服器收到已登入客戶端輸入內容,轉發至其他登入客戶端。
 * 
 * TODO 客戶端下線檢測
 */
public class NIOServer {

    private int port = 8080;
    private Charset charset = Charset.forName("UTF-8");
    //用來記錄線上人數,以及暱稱
    private static HashSet<String> users = new HashSet<String>();
    
    private static String USER_EXIST = "系統提示:該暱稱已經存在,請換一個暱稱";
    //相當於自定義協議格式,與客戶端協商好
    private static String USER_CONTENT_SPILIT = "#@#";
    
    private Selector selector = null;
    
    
    public NIOServer(int port) throws IOException{

        this.port = port;
		//要想富,先修路
		//先把通道開啟
		ServerSocketChannel server = ServerSocketChannel.open();
		
		//設定高速公路的關卡
		server.bind(new InetSocketAddress(this.port));
		server.configureBlocking(false);
		
		
		//開門迎客,排隊叫號大廳開始工作
		selector = Selector.open();
		
		//告訴服務叫號大廳的工作人員,你可以接待了(事件)
		server.register(selector, SelectionKey.OP_ACCEPT);
		
		System.out.println("服務已啟動,監聽埠是:" + this.port);
	}
    
    
    public void listener() throws IOException{
    	
    	//死迴圈,這裡不會阻塞
    	//CPU工作頻率可控了,是可控的固定值
    	while(true) {
    		
    		//在輪詢,我們服務大廳中,到底有多少個人正在排隊
            int wait = selector.select();
            if(wait == 0) continue; //如果沒有人排隊,進入下一次輪詢
            
            //取號,預設給他分配個號碼(排隊號碼)
            Set<SelectionKey> keys = selector.selectedKeys();  //可以通過這個方法,知道可用通道的集合
            Iterator<SelectionKey> iterator = keys.iterator();
            while(iterator.hasNext()) {
				SelectionKey key = (SelectionKey) iterator.next();
				//處理一個,號碼就要被消除,打發他走人(別在服務大廳佔著茅坑不拉屎了)
				//過號不候
				iterator.remove();
				//處理邏輯
				process(key);
            }
        }
		
	}
    
    
    public void process(SelectionKey key) throws IOException {
    	//判斷客戶端確定已經進入服務大廳並且已經可以實現互動了
        if(key.isAcceptable()){
        	ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel client = server.accept();
            //非阻塞模式
            client.configureBlocking(false);
            //註冊選擇器,並設定為讀取模式,收到一個連線請求,然後起一個SocketChannel,並註冊到selector上,之後這個連線的資料,就由這個SocketChannel處理
            client.register(selector, SelectionKey.OP_READ);
            
            //將此對應的channel設定為準備接受其他客戶端請求
            key.interestOps(SelectionKey.OP_ACCEPT);
//            System.out.println("有客戶端連線,IP地址為 :" + sc.getRemoteAddress());
            client.write(charset.encode("請輸入你的暱稱"));
        }
        //處理來自客戶端的資料讀取請求
        if(key.isReadable()){
            //返回該SelectionKey對應的 Channel,其中有資料需要讀取
            SocketChannel client = (SocketChannel)key.channel(); 
            
            //往緩衝區讀資料
            ByteBuffer buff = ByteBuffer.allocate(1024);
            StringBuilder content = new StringBuilder();
            try{
                while(client.read(buff) > 0)
                {
                    buff.flip();
                    content.append(charset.decode(buff));
                    
                }
//                System.out.println("從IP地址為:" + sc.getRemoteAddress() + "的獲取到訊息: " + content);
                //將此對應的channel設定為準備下一次接受資料
                key.interestOps(SelectionKey.OP_READ);
            }catch (IOException io){
            	key.cancel();
                if(key.channel() != null)
                {
                	key.channel().close();
                }
            }
            if(content.length() > 0) {
                String[] arrayContent = content.toString().split(USER_CONTENT_SPILIT);
                //註冊使用者
                if(arrayContent != null && arrayContent.length == 1) {
                    String nickName = arrayContent[0];
                    if(users.contains(nickName)) {
                    	client.write(charset.encode(USER_EXIST));
                    } else {
                        users.add(nickName);
                        int onlineCount = onlineCount();
                        String message = "歡迎 " + nickName + " 進入聊天室! 當前線上人數:" + onlineCount;
                        broadCast(null, message);
                    }
                } 
                //註冊完了,傳送訊息
                else if(arrayContent != null && arrayContent.length > 1) {
                    String nickName = arrayContent[0];
                    String message = content.substring(nickName.length() + USER_CONTENT_SPILIT.length());
                    message = nickName + " 說 " + message;
                    if(users.contains(nickName)) {
                        //不回發給傳送此內容的客戶端
                    	broadCast(client, message);
                    }
                }
            }
            
        }
    }
    
    //TODO 要是能檢測下線,就不用這麼統計了
    public int onlineCount() {
        int res = 0;
        for(SelectionKey key : selector.keys()){
            Channel target = key.channel();
            
            if(target instanceof SocketChannel){
                res++;
            }
        }
        return res;
    }
    
    
    public void broadCast(SocketChannel client, String content) throws IOException {
        //廣播資料到所有的SocketChannel中
        for(SelectionKey key : selector.keys()) {
            Channel targetchannel = key.channel();
            //如果client不為空,不回發給傳送此內容的客戶端
            if(targetchannel instanceof SocketChannel && targetchannel != client) {
                SocketChannel target = (SocketChannel)targetchannel;
                target.write(charset.encode(content));
            }
        }
    }
    
    
    public static void main(String[] args) throws IOException {
        new NIOServer(8080).listener();
    }
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class NIOClient {

	private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 8080);
    private Selector selector = null;
    private SocketChannel client = null;
    
    private String nickName = "";
    private Charset charset = Charset.forName("UTF-8");
    private static String USER_EXIST = "系統提示:該暱稱已經存在,請換一個暱稱";
    private static String USER_CONTENT_SPILIT = "#@#";
    
    
    public NIOClient() throws IOException{

        //不管三七二十一,先把路修好,把關卡開放
        //連線遠端主機的IP和埠
        client = SocketChannel.open(serverAdrress);
        client.configureBlocking(false);
        
        //開門接客
        selector = Selector.open();
        client.register(selector, SelectionKey.OP_READ);
    }
    
    public void session(){
    	//開闢一個新執行緒從伺服器端讀資料
        new Reader().start();
        //開闢一個新執行緒往伺服器端寫資料
        new Writer().start();
	}
    
    private class Writer extends Thread{

		@Override
		public void run() {
			try{
				//在主執行緒中 從鍵盤讀取資料輸入到伺服器端
		        Scanner scan = new Scanner(System.in);
		        while(scan.hasNextLine()){
		            String line = scan.nextLine();
		            if("".equals(line)) continue; //不允許發空訊息
		            if("".equals(nickName)) {
		            	nickName = line;
		                line = nickName + USER_CONTENT_SPILIT;
		            } else {
		                line = nickName + USER_CONTENT_SPILIT + line;
		            }
//		            client.register(selector, SelectionKey.OP_WRITE);
		            client.write(charset.encode(line));//client既能寫也能讀,這邊是寫
		        }
		        scan.close();
			}catch(Exception e){
				
			}
		}
    	
    }
    
    
    private class Reader extends Thread {
        public void run() {
            try {
            	
            	//輪詢
                while(true) {
                    int readyChannels = selector.select();
                    if(readyChannels == 0) continue;
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();  //可以通過這個方法,知道可用通道的集合
                    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                    while(keyIterator.hasNext()) {
                         SelectionKey key = (SelectionKey) keyIterator.next();
                         keyIterator.remove();
                         process(key);
                    }
                }
            }
            catch (IOException io){
            	
            }
        }

        private void process(SelectionKey key) throws IOException {
            if(key.isReadable()){
                //使用 NIO 讀取 Channel中的資料,這個和全域性變數client是一樣的,因為只註冊了一個SocketChannel
                //client既能寫也能讀,這邊是讀
                SocketChannel sc = (SocketChannel)key.channel();
                
                ByteBuffer buff = ByteBuffer.allocate(1024);
                String content = "";
                while(sc.read(buff) > 0)
                {
                    buff.flip();
                    content += charset.decode(buff);
                }
                //若系統傳送通知名字已經存在,則需要換個暱稱
                if(USER_EXIST.equals(content)) {
                	nickName = "";
                }
                System.out.println(content);
                key.interestOps(SelectionKey.OP_READ);
            }
        }
    }
    
    
    
    public static void main(String[] args) throws IOException
    {
        new NIOClient().session();
    }
}

緩衝區 Buffer
 

緩衝區實際上是一個容器物件, 更直接的說, 其實就是一個數組, 在 NIO 庫中, 所有資料都
是用緩衝區處理的。 在讀取資料時, 它是直接讀到緩衝區中的; 在寫入資料時, 它也是寫
入到緩衝區中的; 任何時候訪問 NIO 中的資料, 都是將它放到緩衝區中。 而在面向流 I/O
系統中, 所有資料都是直接寫入或者直接將資料讀取到 Stream 物件中。

在 NIO 中, 所有的緩衝區型別都繼承於抽象類 Buffer, 最常用的就是 ByteBuffer, 對於 Java
中的基本型別, 基本都有一個具體 Buffer 型別與之相對應, 它們之間的繼承關係如下圖所
示:

下面是一個簡單的使用 IntBuffer 的例子:
 

import java.nio.IntBuffer;
public class TestIntBuffer {
public static void main(String[] args) {
// 分配新的 int 緩衝區, 引數為緩衝區容量
// 新緩衝區的當前位置將為零, 其界限(限制位置)將為其容量。 它將具有一個底層實現陣列,
//其陣列偏移量將為零。
IntBuffer buffer = IntBuffer.allocate(8);
for (int i = 0; i < buffer.capacity(); ++i) {
int j = 2 * (i + 1);
// 將給定整數寫入此緩衝區的當前位置, 當前位置遞增
buffer.put(j);
}
// 重設此緩衝區, 將限制設定為當前位置, 然後將當前位置設定為 0
buffer.flip();
// 檢視在當前位置和限制位置之間是否有元素
while (buffer.hasRemaining()) {
// 讀取此緩衝區當前位置的整數, 然後當前位置遞增
int j = buffer.get();
System.out.print(j + " ");
}
}
}

執行後可以看到: