分散式專題(七)NIO
netty 和 mina是對NIO進行封裝的框架,所以學習netty之前先了解NIO
具體參考文件
連結:https://pan.baidu.com/s/1sCjacdyC3fYN3SBubgCiuw
提取碼:osim
1)阻塞(Block)和非阻塞(Non-Block):
阻塞和非阻塞是程序在訪問資料的時候, 資料是否準備就緒的一種處理方式,當資料沒
有準備的時候 阻塞: 往往需要等待緩衝區中的資料準備好過後才處理其他的事情, 否則一
直等待在那裡。
非阻塞:當我們的程序訪問我們的資料緩衝區的時候, 如果資料沒有準備好則直接返回,
不會等待。 如果資料已經準備好, 也直接返回。
2) 同步(Synchronization)和非同步(Asynchronous)的方式:
同步和非同步都是基於應用程式和作業系統處理 IO 事件所採用的方式。 比如同步: 是應
用程式要直接參與 IO 讀寫的操作。 非同步: 所有的 IO 讀寫交給作業系統去處理, 應用程式只
需要等待通知。
同步方式在處理 IO 事件的時候, 必須阻塞在某個方法上面等待我們的 IO 事件完成(阻
塞 IO 事件或者通過輪詢 IO 事件的方式),對於非同步來說, 所有的 IO 讀寫都交給了作業系統。
這個時候, 我們可以去做其他的事情, 並不需要去完成真正的 IO 操作, 當操作完成 IO 後,
會給我們的應用程式一個通知。
同步:1)阻塞到 IO 事件, 阻塞到 read 或則 write。 這個時候我們就完全不能做自己的
事情。 讓讀寫方法加入到執行緒裡面, 然後阻塞執行緒來實現, 對執行緒的效能開銷比較大。
nio:同步非阻塞
面向流和緩衝區
Java NIO 和 IO 之間第一個最大的區別是, IO 是面向流的, NIO 是面向緩衝區的
阻塞與非阻塞
Java IO 的各種流是阻塞的。 這意味著, 當一個執行緒呼叫 read() 或 write()時, 該執行緒被阻塞, 直到有一些資料被讀取, 或資料完全寫入。該執行緒在此期間不能再幹任何事情了
JavaNIO 的非阻塞模式,一個執行緒請求寫入一些資料到某通道, 但不需要等待它完全寫入, 這個執行緒同時可以去做別的事情。執行緒通常將非阻塞 IO 的空閒時間用於在其它通道上執行 IO 操作, 所以一個單獨的執行緒現在可以管理多個輸入和輸出通道(channel)
選擇器
Java NIO 的選擇器允許一個單獨的執行緒來監視多個輸入通道, 你可以註冊多個通道使用一個選擇器, 然後使用一個單獨的執行緒來“選擇” 通道: 這些通道里已經有可以處理的輸入,或者選擇已準備寫入的通道。 這種選擇機制, 使得一個單獨的執行緒很容易來管理多個通道。
1.BIO寫法
import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; public class BIOServer { ServerSocket server; //伺服器 public BIOServer(int port){ try { //把Socket服務端啟動 server = new ServerSocket(port); System.out.println("BIO服務已啟動,監聽埠是:" + port); } catch (IOException e) { e.printStackTrace(); } } /** * 開始監聽,並處理邏輯 * @throws IOException */ public void listener() throws IOException{ //死迴圈監聽 while(true){ //雖然寫了一個死迴圈,如果一直沒有客戶端連線的話,這裡一直不會往下執行 Socket client = server.accept();//等待客戶端連線,阻塞方法 //拿到輸入流,也就是鄉村公路 InputStream is = client.getInputStream(); //緩衝區,陣列而已 byte [] buff = new byte[1024]; int len = is.read(buff); //只要一直有資料寫入,len就會一直大於0 if(len > 0){ String msg = new String(buff,0,len); System.out.println("收到" + msg); } } } public static void main(String[] args) throws IOException { new BIOServer(8080).listener(); } }
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
public class BIOClient2 {
public static void main(String[] args) throws UnknownHostException, IOException {
try{
//開一條鄉村公路
Socket client = new Socket("localhost", 8080);
//輸出流通道開啟
OutputStream os = client.getOutputStream();
//產生一個隨機的字串,UUID
String name = UUID.randomUUID().toString();
//傳送給服務端
os.write(name.getBytes());
os.close();
client.close();
}catch(Exception e){
}
}
}
2.NIO寫法
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* 網路多客戶端聊天室
* 功能1: 客戶端通過Java NIO連線到服務端,支援多客戶端的連線
* 功能2:客戶端初次連線時,服務端提示輸入暱稱,如果暱稱已經有人使用,提示重新輸入,如果暱稱唯一,則登入成功,之後傳送訊息都需要按照規定格式帶著暱稱傳送訊息
* 功能3:客戶端登入後,傳送已經設定好的歡迎資訊和線上人數給客戶端,並且通知其他客戶端該客戶端上線
* 功能4:伺服器收到已登入客戶端輸入內容,轉發至其他登入客戶端。
*
* TODO 客戶端下線檢測
*/
public class NIOServer {
private int port = 8080;
private Charset charset = Charset.forName("UTF-8");
//用來記錄線上人數,以及暱稱
private static HashSet<String> users = new HashSet<String>();
private static String USER_EXIST = "系統提示:該暱稱已經存在,請換一個暱稱";
//相當於自定義協議格式,與客戶端協商好
private static String USER_CONTENT_SPILIT = "#@#";
private Selector selector = null;
public NIOServer(int port) throws IOException{
this.port = port;
//要想富,先修路
//先把通道開啟
ServerSocketChannel server = ServerSocketChannel.open();
//設定高速公路的關卡
server.bind(new InetSocketAddress(this.port));
server.configureBlocking(false);
//開門迎客,排隊叫號大廳開始工作
selector = Selector.open();
//告訴服務叫號大廳的工作人員,你可以接待了(事件)
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務已啟動,監聽埠是:" + this.port);
}
public void listener() throws IOException{
//死迴圈,這裡不會阻塞
//CPU工作頻率可控了,是可控的固定值
while(true) {
//在輪詢,我們服務大廳中,到底有多少個人正在排隊
int wait = selector.select();
if(wait == 0) continue; //如果沒有人排隊,進入下一次輪詢
//取號,預設給他分配個號碼(排隊號碼)
Set<SelectionKey> keys = selector.selectedKeys(); //可以通過這個方法,知道可用通道的集合
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
//處理一個,號碼就要被消除,打發他走人(別在服務大廳佔著茅坑不拉屎了)
//過號不候
iterator.remove();
//處理邏輯
process(key);
}
}
}
public void process(SelectionKey key) throws IOException {
//判斷客戶端確定已經進入服務大廳並且已經可以實現互動了
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
//非阻塞模式
client.configureBlocking(false);
//註冊選擇器,並設定為讀取模式,收到一個連線請求,然後起一個SocketChannel,並註冊到selector上,之後這個連線的資料,就由這個SocketChannel處理
client.register(selector, SelectionKey.OP_READ);
//將此對應的channel設定為準備接受其他客戶端請求
key.interestOps(SelectionKey.OP_ACCEPT);
// System.out.println("有客戶端連線,IP地址為 :" + sc.getRemoteAddress());
client.write(charset.encode("請輸入你的暱稱"));
}
//處理來自客戶端的資料讀取請求
if(key.isReadable()){
//返回該SelectionKey對應的 Channel,其中有資料需要讀取
SocketChannel client = (SocketChannel)key.channel();
//往緩衝區讀資料
ByteBuffer buff = ByteBuffer.allocate(1024);
StringBuilder content = new StringBuilder();
try{
while(client.read(buff) > 0)
{
buff.flip();
content.append(charset.decode(buff));
}
// System.out.println("從IP地址為:" + sc.getRemoteAddress() + "的獲取到訊息: " + content);
//將此對應的channel設定為準備下一次接受資料
key.interestOps(SelectionKey.OP_READ);
}catch (IOException io){
key.cancel();
if(key.channel() != null)
{
key.channel().close();
}
}
if(content.length() > 0) {
String[] arrayContent = content.toString().split(USER_CONTENT_SPILIT);
//註冊使用者
if(arrayContent != null && arrayContent.length == 1) {
String nickName = arrayContent[0];
if(users.contains(nickName)) {
client.write(charset.encode(USER_EXIST));
} else {
users.add(nickName);
int onlineCount = onlineCount();
String message = "歡迎 " + nickName + " 進入聊天室! 當前線上人數:" + onlineCount;
broadCast(null, message);
}
}
//註冊完了,傳送訊息
else if(arrayContent != null && arrayContent.length > 1) {
String nickName = arrayContent[0];
String message = content.substring(nickName.length() + USER_CONTENT_SPILIT.length());
message = nickName + " 說 " + message;
if(users.contains(nickName)) {
//不回發給傳送此內容的客戶端
broadCast(client, message);
}
}
}
}
}
//TODO 要是能檢測下線,就不用這麼統計了
public int onlineCount() {
int res = 0;
for(SelectionKey key : selector.keys()){
Channel target = key.channel();
if(target instanceof SocketChannel){
res++;
}
}
return res;
}
public void broadCast(SocketChannel client, String content) throws IOException {
//廣播資料到所有的SocketChannel中
for(SelectionKey key : selector.keys()) {
Channel targetchannel = key.channel();
//如果client不為空,不回發給傳送此內容的客戶端
if(targetchannel instanceof SocketChannel && targetchannel != client) {
SocketChannel target = (SocketChannel)targetchannel;
target.write(charset.encode(content));
}
}
}
public static void main(String[] args) throws IOException {
new NIOServer(8080).listener();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class NIOClient {
private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 8080);
private Selector selector = null;
private SocketChannel client = null;
private String nickName = "";
private Charset charset = Charset.forName("UTF-8");
private static String USER_EXIST = "系統提示:該暱稱已經存在,請換一個暱稱";
private static String USER_CONTENT_SPILIT = "#@#";
public NIOClient() throws IOException{
//不管三七二十一,先把路修好,把關卡開放
//連線遠端主機的IP和埠
client = SocketChannel.open(serverAdrress);
client.configureBlocking(false);
//開門接客
selector = Selector.open();
client.register(selector, SelectionKey.OP_READ);
}
public void session(){
//開闢一個新執行緒從伺服器端讀資料
new Reader().start();
//開闢一個新執行緒往伺服器端寫資料
new Writer().start();
}
private class Writer extends Thread{
@Override
public void run() {
try{
//在主執行緒中 從鍵盤讀取資料輸入到伺服器端
Scanner scan = new Scanner(System.in);
while(scan.hasNextLine()){
String line = scan.nextLine();
if("".equals(line)) continue; //不允許發空訊息
if("".equals(nickName)) {
nickName = line;
line = nickName + USER_CONTENT_SPILIT;
} else {
line = nickName + USER_CONTENT_SPILIT + line;
}
// client.register(selector, SelectionKey.OP_WRITE);
client.write(charset.encode(line));//client既能寫也能讀,這邊是寫
}
scan.close();
}catch(Exception e){
}
}
}
private class Reader extends Thread {
public void run() {
try {
//輪詢
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys(); //可以通過這個方法,知道可用通道的集合
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = (SelectionKey) keyIterator.next();
keyIterator.remove();
process(key);
}
}
}
catch (IOException io){
}
}
private void process(SelectionKey key) throws IOException {
if(key.isReadable()){
//使用 NIO 讀取 Channel中的資料,這個和全域性變數client是一樣的,因為只註冊了一個SocketChannel
//client既能寫也能讀,這邊是讀
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
while(sc.read(buff) > 0)
{
buff.flip();
content += charset.decode(buff);
}
//若系統傳送通知名字已經存在,則需要換個暱稱
if(USER_EXIST.equals(content)) {
nickName = "";
}
System.out.println(content);
key.interestOps(SelectionKey.OP_READ);
}
}
}
public static void main(String[] args) throws IOException
{
new NIOClient().session();
}
}
緩衝區 Buffer
緩衝區實際上是一個容器物件, 更直接的說, 其實就是一個數組, 在 NIO 庫中, 所有資料都
是用緩衝區處理的。 在讀取資料時, 它是直接讀到緩衝區中的; 在寫入資料時, 它也是寫
入到緩衝區中的; 任何時候訪問 NIO 中的資料, 都是將它放到緩衝區中。 而在面向流 I/O
系統中, 所有資料都是直接寫入或者直接將資料讀取到 Stream 物件中。
在 NIO 中, 所有的緩衝區型別都繼承於抽象類 Buffer, 最常用的就是 ByteBuffer, 對於 Java
中的基本型別, 基本都有一個具體 Buffer 型別與之相對應, 它們之間的繼承關係如下圖所
示:
下面是一個簡單的使用 IntBuffer 的例子:
import java.nio.IntBuffer;
public class TestIntBuffer {
public static void main(String[] args) {
// 分配新的 int 緩衝區, 引數為緩衝區容量
// 新緩衝區的當前位置將為零, 其界限(限制位置)將為其容量。 它將具有一個底層實現陣列,
//其陣列偏移量將為零。
IntBuffer buffer = IntBuffer.allocate(8);
for (int i = 0; i < buffer.capacity(); ++i) {
int j = 2 * (i + 1);
// 將給定整數寫入此緩衝區的當前位置, 當前位置遞增
buffer.put(j);
}
// 重設此緩衝區, 將限制設定為當前位置, 然後將當前位置設定為 0
buffer.flip();
// 檢視在當前位置和限制位置之間是否有元素
while (buffer.hasRemaining()) {
// 讀取此緩衝區當前位置的整數, 然後當前位置遞增
int j = buffer.get();
System.out.print(j + " ");
}
}
}
執行後可以看到: