基於Java NIO2實現的非同步非阻塞訊息通訊框架
基於Java NIO2實現的非同步非阻塞訊息通訊框架
前奏
因為NIO並不容易掌握,所以這注定會是一篇長文,而且即便篇幅很大,亦難以把很多細節解釋清楚,只能側重於從整體上進行把握,並實現一個簡單的客戶端服務端訊息通訊框架作為例子,以便有需要的開發人員參考之。借用淘寶伯巖給出的忠告就是
- 儘量不要嘗試實現自己的NIO框架,除非有經驗豐富的工程師
- 儘量使用經過廣泛實踐的開源NIO框架Mina/Netty/xSocket
- 儘量使用最新版穩定版JDK
- 遇到問題的時候,可以先看下Java的Bug Database
Asynchronous I/O
是在JDK7中提出的非同步非阻塞I/O,習慣上稱之為NIO2
,也叫AIO
,AIO
是對JDK1.4中提出的同步非阻塞I/O的進一步增強,主要包括
- 更新的
Path
類,該類在NIO裡對檔案系統進行了進一步的抽象,用來替換原來的java.io.File
,可以通過File.toPath()
和Path.toFile()
File
和Path
進行相互轉換 File Attributes
,java.nio.file.attribute
針對檔案屬性提供了各種使用者所需的元資料,不同作業系統使用的類不太一樣,支援的屬性分類有:BasicFileAttributeView DosFileAttributeView PosixFileAttributeView FileOwnerAttributeView AclFileAttributeView UserDefinedFileAttributeView
Symbolic and Hard Links
,相當於用Java程式實現Linux中的ln命令Watch Service API
Random Access Files
主要提供了一個SeekableByteChannel
介面,配合ByteBuffer
使得隨機訪問檔案更加方便Sockets API
主要是NIO1中的Selector
模式實現同步非阻塞Asynchronous Channel API
由NIO1中的Selector
模式變成方法回撥模式,使用更加方便,主要是可以非同步實現檔案的讀寫了
AIO應用開發
Future方式
Future
是在JDK1.5
中加入Java併發包的,該介面提供get()
方法用於獲取任務完成之後的處理結果。在AIO
中,可以接受一個I/O連線請求,返回一個Future
物件,然後可以基於該返回物件進行後續的操作,包括使其阻塞、檢視是否完成、超時異常,使用方式如下。
服務端程式碼
import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* <p>
* Created with IntelliJ IDEA. 16/2/24 16:57
* </p>
* <p>
* ClassName:ServerOnFuture
* </p>
* <p>
* Description:基於Future的NIO2服務端實現,此時的服務端還無法實現多客戶端併發,如果有多個客戶端併發連線該服務端的話,
* 客戶端會出現阻塞,待前一個客戶端處理完畢,服務端才會接受下一個客戶端的連線並處理
* </P>
*
* @author Wang Xu
* @version V1.0.0
* @since V1.0.0
* WebSite: http://codepub.cn
* Licence: Apache v2 License
*/
@Log4j2
public class ServerOnFuture {
static final int DEFAULT_PORT = 7777;
static final String IP = "127.0.0.1";
static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
public static void main(String[] args) {
try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
if (serverSocketChannel.isOpen()) {
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverSocketChannel.bind(new InetSocketAddress(IP, DEFAULT_PORT));
log.info("Waiting for connections...");
while (true) {
Future<AsynchronousSocketChannel> channelFuture = serverSocketChannel.accept();
try (AsynchronousSocketChannel socketChannel = channelFuture.get()) {
log.info("Incoming connection from : " + socketChannel.getRemoteAddress());
while (socketChannel.read(buffer).get() != -1) {
buffer.flip();
// Java NIO2或者Java AIO報: java.util.concurrent.ExecutionException: java.io.IOException: 指定的網路名不再可用。
// 此處要注意,千萬不能直接操作buffer,否則客戶端會阻塞並報錯,“java.util.concurrent.ExecutionException: java.io.IOException: 指定的網路名不再可用。”
ByteBuffer duplicate = buffer.duplicate();
showMessage(duplicate);
socketChannel.write(buffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
log.info(socketChannel.getRemoteAddress() + " was successfully served!");
} catch (InterruptedException | ExecutionException e) {
log.error(e);
}
}
} else {
log.warn("The asynchronous server-socket channel cannot be opened!");
}
} catch (IOException e) {
log.error(e);
}
}
protected static void showMessage(ByteBuffer buffer) {
CharBuffer decode = Charset.defaultCharset().decode(buffer);
log.info(decode.toString());
}
}
客戶端程式碼
import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
* <p>
* Created with IntelliJ IDEA. 16/2/24 17:48
* </p>
* <p>
* ClassName:ClientOnFuture
* </p>
* <p>
* Description:基於Future的NIO2客戶端實現
* </P>
*
* @author Wang Xu
* @version V1.0.0
* @since V1.0.0
* WebSite: http://codepub.cn
* Licence: Apache v2 License
*/
@Log4j2
public class ClientOnFuture {
static final int DEFAULT_PORT = 7777;
static final String IP = "127.0.0.1";
static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
public static void main(String[] args) {
try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
if (socketChannel.isOpen()) {
//設定一些選項,非必選項,可使用預設設定
socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024);
socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
Void aVoid = socketChannel.connect(new InetSocketAddress(IP, DEFAULT_PORT)).get();
//返回null表示連線成功
if (aVoid == null) {
Integer messageLength = socketChannel.write(ByteBuffer.wrap("Hello Server!".getBytes())).get();
log.info(messageLength);
while (socketChannel.read(buffer).get() != -1) {
buffer.flip();//寫入buffer之後,翻轉,之後可以從buffer讀取,或者將buffer內容寫入通道
CharBuffer decode = Charset.defaultCharset().decode(buffer);
log.info(decode.toString());
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
int r = new Random().nextInt(1000);
if (r == 50) {
log.info("Client closed!");
break;
} else {
// Java NIO2或者Java AIO報: Exception in thread "main" java.nio.channels.WritePendingException
// 此處注意,如果在頻繁呼叫write()的時候,在上一個操作沒有寫完的情況下,呼叫write會觸發WritePendingException異常,
// 所以此處最好在呼叫write()之後呼叫get()以便明確等到有返回結果
socketChannel.write(ByteBuffer.wrap("Random number : ".concat(String.valueOf(r)).getBytes())).get();
}
}
} else {
log.warn("The connection cannot be established!");
}
} else {
log.warn("The asynchronous socket-channel cannot be opened!");
}
} catch (IOException | InterruptedException | ExecutionException e) {
log.error(e);
}
}
}
Future方式實現為多客戶端併發服務
如何讓服務端同時可以接受多個客戶端的連線呢?一個簡單的處理方法就是使用ExecutorService
。每次新建一個連線,並且獲得返回值之後,這個返回值就是一個AsynchronousSocketChannel
的通道,將其提交給執行緒池,由一個工作執行緒進行後續處理。然後一個新的執行緒準備好在等待接受下一個連線。程式碼示例如下。
import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.*;
/**
* <p>
* Created with IntelliJ IDEA. 16/2/24 17:38
* </p>
* <p>
* ClassName:ServerOnFutureForMultiClients
* </p>
* <p>
* Description:基於Future實現的可以接受多客戶端併發的Java NIO2服務端實現
* </P>
*
* @author Wang Xu
* @version V1.0.0
* @since V1.0.0
* WebSite: http://codepub.cn
* Licence: Apache v2 License
*/
@Log4j2
public class ServerOnFutureForMultiClients {
static final int DEFAULT_PORT = 7777;
static final String IP = "127.0.0.1";
static ExecutorService taskExecutorService = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
public static void main(String[] args) {
try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
if (serverSocketChannel.isOpen()) {
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverSocketChannel.bind(new InetSocketAddress(IP, DEFAULT_PORT));
log.info("Waiting for connections...");
while (true) {
Future<AsynchronousSocketChannel> socketChannelFuture = serverSocketChannel.accept();
try {
final AsynchronousSocketChannel socketChannel = socketChannelFuture.get();
Callable<String> worker = new Callable<String>() {
@Override
public String call() throws Exception {
String s = socketChannel.getRemoteAddress().toString();
log.info("Incoming connection from : " + s);
while (socketChannel.read(buffer).get() != -1) {
buffer.flip();
ByteBuffer duplicate = buffer.duplicate();
showMessage(duplicate);
socketChannel.write(buffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
socketChannel.close();
log.info(s + " was successfully served!");
return s;
}
};
taskExecutorService.submit(worker);
} catch (InterruptedException | ExecutionException e) {
log.error(e);
taskExecutorService.shutdown();
while (!taskExecutorService.isTerminated()) {
}
break;
}
}
} else {
log.warn("The asynchronous server-socket channel cannot be opened!");
}
} catch (IOException e) {
log.error(e);
}
}
protected static void showMessage(ByteBuffer buffer) {
CharBuffer decode = Charset.defaultCharset().decode(buffer);
log.info(decode.toString());
}
}
Callback方式
方法回撥模式,即提交一個I/O操作請求,並且指定一個CompletionHandler
。當非同步操作完成時,便會發一個通知,此時該CompletionHandler
物件覆寫的方法將被呼叫,如果成功呼叫completed
方法,如果失敗呼叫failed
方法,首先看下Java API
。
public interface CompletionHandler<V,A> {
/**
* Invoked when an operation has completed.
*
* @param result 操作結果
* The result of the I/O operation.
* @param attachment 提交請求時的引數,通常會封裝一個連線環境
* The object attached to the I/O operation when it was initiated.
*/
void completed(V result, A attachment);
/**
* Invoked when an operation fails.
*
* @param exc
* The exception to indicate why the I/O operation failed
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void failed(Throwable exc, A attachment);
}
AIO提供了四種類型的非同步通道以及不同的I/O操作可以接收一個CompletionHandler
物件,分別是:
- AsynchronousSocketChannel:connect,read,write
- AsynchronousFileChannel:lock,read,write
- AsynchronousServerSocketChannel:accept
- AsynchronousDatagramChannel:read,write,send,receive
服務端示例程式碼如下
import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
/**
* <p>
* Created with IntelliJ IDEA. 16/2/24 20:14
* </p>
* <p>
* ClassName:ServerOnCompletionHandler
* </p>
* <p>
* Description:基於CompletionHandler實現NIO2的服務端
* </P>
*
* @author Wang Xu
* @version V1.0.0
* @since V1.0.0
* WebSite: http://codepub.cn
* Licence: Apache v2 License
*/
@Log4j2
public class ServerOnCompletionHandler {
static final int DEFAULT_PORT = 7777;
static final String IP = "127.0.0.1";
public static void main(String[] args) {
try (final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
if (serverSocketChannel.isOpen()) {
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverSocketChannel.bind(new InetSocketAddress(IP, DEFAULT_PORT));
log.info("Waiting for connections...");
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
@Override
public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
//注意接收一個連線之後,緊接著可以接收下一個連線,所以必須再次呼叫accept方法
serverSocketChannel.accept(null, this);
try {
log.info("Incoming connection from : " + socketChannel.getRemoteAddress());
while (socketChannel.read(buffer).get() != -1) {
buffer.flip();
final ByteBuffer duplicate = buffer.duplicate();
final CharBuffer decode = Charset.
相關推薦
基於Java NIO2實現的非同步非阻塞訊息通訊框架
原文傳送門
基於Java NIO2實現的非同步非阻塞訊息通訊框架
前奏
AIO應用開發
Future方式
Callback方式
Reader/Writer方式實現
執行緒池和Group
PendingExceptio
【Flask】Flask實現非同步非阻塞請求功能
前言
最近做物聯網專案的時候需要搭建一個非同步非阻塞的HTTP伺服器,經過查詢資料,發現可以使用gevent包。
關於gevent
Gevent 是一個 Python 併發網路庫,它使用了基於 libevent 事件迴圈的 greenlet 來提供一個高階
利用tornado使請求實現非同步非阻塞
基本IO模型
網上搜了很多關於同步非同步,阻塞非阻塞的說法,理解還是不能很透徹,有必要買書看下。 參考:使用非同步 I/O 大大提高應用程式的效能 怎樣理解阻塞非阻塞與同步非同步的區別?
同步和非同步:主要關注訊息通訊機制(重點在B?)。 同步:A呼叫B,B處理直到獲得結
Java簡單實現Socket非阻塞通訊
用java實現socket C/S通訊很簡單,很多教科書上都有。但是這些通訊模型大都是阻塞式的,其弊端也很明顯:一方必須要接收的到對方的訊息後,才能編輯自己的訊息發出。同樣對方也要一直等待這條訊息收到後才能傳送新的訊息。用網路通訊的知識講,大概就是半雙工通訊
Guava ListenableFuture實現非同步非阻塞呼叫
為了保證系統響應迅速,需要尋找一種方法能夠使調取介面能夠非同步執行,而Java正好提供了類似的方法,在java.util.concurrent中包含了Future相關的類,運用其中的一些類可以進行非同步計算,以減少主執行緒的等待時間。比如啟動一個main方
java NIO實現同步非阻塞伺服器
server
package net.smgui.util;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.Byt
Java入門系列-25-NIO(實現非阻塞網路通訊)
還記得之前介紹NIO時對比傳統IO的一大特點嗎?就是NIO是非阻塞式的,這篇文章帶大家來看一下非阻塞的網路操作。
補充:以陣列的形式使用緩衝區
package testnio;
import java.io.IOException;
import java.io.RandomAccessFile;
impo
java高併發系統之非同步非阻塞
在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合資料拼裝模板、熱點統計、快取、下游功能降級開關、託底資料等等。其中聚合資料需要呼叫其它多個系統服務獲取資料、拼裝資料/模板然後返回給前端,聚合資料來源主要有依賴系統
聊聊java高併發系統之非同步非阻塞
幾種呼叫方式
同步阻塞呼叫
即序列呼叫,響應時間為所有服務的響應時間總和;
半非同步(非同步Future)
執行緒池,非同步Future,使用場景:併發請求多服務,總耗時為最長響應時間;提升總響應時間,但是阻塞主請求執行緒,高併發時依然會造成執行緒數過多,CPU上下文切換;
全非同步(Cal
java同步,非同步和阻塞,非阻塞的聯絡和區別
所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。而非同步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工作,依賴
Java網路程式設計——使用NIO實現非阻塞Socket通訊
除了普通的Socket與ServerSocket實現的阻塞式通訊外,java提供了非阻塞式通訊的NIO API。先看一下NIO的實現原理。
從圖中可以看出,伺服器上所有Channel(包括ServerSocketChannel和Socket
java NIO 實現非阻塞socket通訊
java的nio為非阻塞式socket通訊提供瞭如下幾個類:
Selector : 它是SelectableChannel物件的多路複用器,所有希望採用非阻塞方式進行通訊的channel都應該註冊到Selector物件。可以通過呼叫此類的open()
基於Socket的多執行緒和非同步非阻塞模式程式設計
剛開始接觸socket的程式設計的時候,遇到了很多的問題,費了很大勁搞懂。其實往往都是一些比較基本的知識,但是都是很重要的,只要對其熟練的掌握後,相信對基於網路的程式設計會有很大的提高,呵呵。
就拿基於C/S結構的例子來說,我們先看看伺服器和客戶端的流
Thinking in Java--使用NIO實現非阻塞Socket通訊
Java1.4提供了一種新的IO讀取方式,稱為NIO。NIO中使用了通道和緩衝器的概念,並且以塊的形式操作資料,這樣更接近作業系統IO操作的形式,提高了JavaIO的效率。NIO的核心類有兩個Channel和Buffer。但是其實除了提升了基本IO操作的效能外,
java網路程式設計(四)----非同步非阻塞aio及proactor模型
(aio)NIO 2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步的套接字通道時真正的非同步非阻塞I/O,對應於UNIX網路程式設計中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現非
thrift java多執行緒非阻塞同步/非同步呼叫例項
作者:呂桂強
郵箱:[email protected]
首先建立thrift檔案
namespace java thriftservice Hello{ string helloString(1:string para)}
執行thrift -ge
【轉】聊聊java高併發系統之非同步非阻塞
在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合資料拼裝模板、熱點統計、快取、下游功能降級開關、託底資料等等。其中聚合資料需要呼叫其它多個系統服務獲取資料、拼裝資料/模板然後返回給前端,聚合資料來源主要有依賴系統/服務、快取、資料庫等;而系統之間
Web驗證碼圖片的生成-基於Java的實現
submit esc page resp ioe 代碼 oge cnblogs pro
驗證碼圖片是由程序動態產生的,每次訪問的內容都是隨機的。那麽如何采用程序動態產生圖片,並能夠顯示在客戶端頁面中呢?原理很簡單,對於java而言,我們首先開發一個Servlet,這個Se
java簡單實現非同步佇列:使用生產者與消費者模型
package com.yunshouhu;
import java.util.concurrent.*;
//java簡單實現非同步佇列:使用生產者與消費者模型
public class MyAsynQueue {
// http://www.importnew.com/22519.h
真正的 Tornado 非同步非阻塞
原文出處https://hexiangyu.me/posts/15
其中 Tornado 的定義是 Web 框架和非同步網路庫,其中他具備有非同步非阻塞能力,能解決他兩個框架請求阻塞的問題,在需要併發能力時候就應該使用 Tornado。
但是在實際使用過程中很容易把 Tornado