1. 程式人生 > >基於Java NIO2實現的非同步非阻塞訊息通訊框架

基於Java NIO2實現的非同步非阻塞訊息通訊框架

原文傳送門

基於Java NIO2實現的非同步非阻塞訊息通訊框架

前奏

因為NIO並不容易掌握,所以這注定會是一篇長文,而且即便篇幅很大,亦難以把很多細節解釋清楚,只能側重於從整體上進行把握,並實現一個簡單的客戶端服務端訊息通訊框架作為例子,以便有需要的開發人員參考之。借用淘寶伯巖給出的忠告就是

  • 儘量不要嘗試實現自己的NIO框架,除非有經驗豐富的工程師
  • 儘量使用經過廣泛實踐的開源NIO框架Mina/Netty/xSocket
  • 儘量使用最新版穩定版JDK
  • 遇到問題的時候,可以先看下Java的Bug Database

Asynchronous I/O是在JDK7中提出的非同步非阻塞I/O,習慣上稱之為NIO2,也叫AIOAIO是對JDK1.4中提出的同步非阻塞I/O的進一步增強,主要包括

  • 更新的Path類,該類在NIO裡對檔案系統進行了進一步的抽象,用來替換原來的java.io.File,可以通過File.toPath()Path.toFile()
    FilePath進行相互轉換
  • File Attributesjava.nio.file.attribute針對檔案屬性提供了各種使用者所需的元資料,不同作業系統使用的類不太一樣,支援的屬性分類有:
    BasicFileAttributeView
    DosFileAttributeView
    PosixFileAttributeView
    FileOwnerAttributeView
    AclFileAttributeView
    UserDefinedFileAttributeView
    
  • Symbolic and Hard Links,相當於用Java程式實現Linux中的ln命令
  • Watch Service API
    ,作為一個執行緒安全的服務用於監控物件的變化和事件,以前直接用Java監控檔案系統的變化是不可能的,只能通過JNI的方式呼叫作業系統的API,而在JDK7中這部分被加入到了標準庫裡
  • Random Access Files主要提供了一個SeekableByteChannel介面,配合ByteBuffer使得隨機訪問檔案更加方便
  • Sockets API主要是NIO1中的Selector模式實現同步非阻塞
  • Asynchronous Channel API由NIO1中的Selector模式變成方法回撥模式,使用更加方便,主要是可以非同步實現檔案的讀寫了

AIO應用開發

Future方式

Future是在JDK1.5中加入Java併發包的,該介面提供get()方法用於獲取任務完成之後的處理結果。在AIO中,可以接受一個I/O連線請求,返回一個Future物件,然後可以基於該返回物件進行後續的操作,包括使其阻塞、檢視是否完成、超時異常,使用方式如下。

服務端程式碼

import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
 * <p>
 * Created with IntelliJ IDEA. 16/2/24 16:57
 * </p>
 * <p>
 * ClassName:ServerOnFuture
 * </p>
 * <p>
 * Description:基於Future的NIO2服務端實現,此時的服務端還無法實現多客戶端併發,如果有多個客戶端併發連線該服務端的話,
 * 客戶端會出現阻塞,待前一個客戶端處理完畢,服務端才會接受下一個客戶端的連線並處理
 * </P>
 *
 * @author Wang Xu
 * @version V1.0.0
 * @since V1.0.0
 * WebSite: http://codepub.cn
 * Licence: Apache v2 License
 */
@Log4j2
public class ServerOnFuture {
    static final int DEFAULT_PORT = 7777;
    static final String IP = "127.0.0.1";
    static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    public static void main(String[] args) {
        try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
            if (serverSocketChannel.isOpen()) {
                serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                serverSocketChannel.bind(new InetSocketAddress(IP, DEFAULT_PORT));
                log.info("Waiting for connections...");
                while (true) {
                    Future<AsynchronousSocketChannel> channelFuture = serverSocketChannel.accept();
                    try (AsynchronousSocketChannel socketChannel = channelFuture.get()) {
                        log.info("Incoming connection from : " + socketChannel.getRemoteAddress());
                        while (socketChannel.read(buffer).get() != -1) {
                            buffer.flip();
                            // Java NIO2或者Java AIO報: java.util.concurrent.ExecutionException: java.io.IOException: 指定的網路名不再可用。
                            // 此處要注意,千萬不能直接操作buffer,否則客戶端會阻塞並報錯,“java.util.concurrent.ExecutionException: java.io.IOException: 指定的網路名不再可用。”
                            ByteBuffer duplicate = buffer.duplicate();
                            showMessage(duplicate);
                            socketChannel.write(buffer).get();
                            if (buffer.hasRemaining()) {
                                buffer.compact();
                            } else {
                                buffer.clear();
                            }
                        }
                        log.info(socketChannel.getRemoteAddress() + " was successfully served!");
                    } catch (InterruptedException | ExecutionException e) {
                        log.error(e);
                    }
                }
            } else {
                log.warn("The asynchronous server-socket channel cannot be opened!");
            }
        } catch (IOException e) {
            log.error(e);
        }
    }
    protected static void showMessage(ByteBuffer buffer) {
        CharBuffer decode = Charset.defaultCharset().decode(buffer);
        log.info(decode.toString());
    }
}

客戶端程式碼

import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
 * <p>
 * Created with IntelliJ IDEA. 16/2/24 17:48
 * </p>
 * <p>
 * ClassName:ClientOnFuture
 * </p>
 * <p>
 * Description:基於Future的NIO2客戶端實現
 * </P>
 *
 * @author Wang Xu
 * @version V1.0.0
 * @since V1.0.0
 * WebSite: http://codepub.cn
 * Licence: Apache v2 License
 */
@Log4j2
public class ClientOnFuture {
    static final int DEFAULT_PORT = 7777;
    static final String IP = "127.0.0.1";
    static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    public static void main(String[] args) {
        try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
            if (socketChannel.isOpen()) {
                //設定一些選項,非必選項,可使用預設設定
                socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024);
                socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024);
                socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
                Void aVoid = socketChannel.connect(new InetSocketAddress(IP, DEFAULT_PORT)).get();
                //返回null表示連線成功
                if (aVoid == null) {
                    Integer messageLength = socketChannel.write(ByteBuffer.wrap("Hello Server!".getBytes())).get();
                    log.info(messageLength);
                    while (socketChannel.read(buffer).get() != -1) {
                        buffer.flip();//寫入buffer之後,翻轉,之後可以從buffer讀取,或者將buffer內容寫入通道
                        CharBuffer decode = Charset.defaultCharset().decode(buffer);
                        log.info(decode.toString());
                        if (buffer.hasRemaining()) {
                            buffer.compact();
                        } else {
                            buffer.clear();
                        }
                        int r = new Random().nextInt(1000);
                        if (r == 50) {
                            log.info("Client closed!");
                            break;
                        } else {
                            // Java NIO2或者Java AIO報: Exception in thread "main" java.nio.channels.WritePendingException
                            // 此處注意,如果在頻繁呼叫write()的時候,在上一個操作沒有寫完的情況下,呼叫write會觸發WritePendingException異常,
                            // 所以此處最好在呼叫write()之後呼叫get()以便明確等到有返回結果
                            socketChannel.write(ByteBuffer.wrap("Random number : ".concat(String.valueOf(r)).getBytes())).get();
                        }
                    }
                } else {
                    log.warn("The connection cannot be established!");
                }
            } else {
                log.warn("The asynchronous socket-channel cannot be opened!");
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            log.error(e);
        }
    }
}

Future方式實現為多客戶端併發服務

如何讓服務端同時可以接受多個客戶端的連線呢?一個簡單的處理方法就是使用ExecutorService。每次新建一個連線,並且獲得返回值之後,這個返回值就是一個AsynchronousSocketChannel的通道,將其提交給執行緒池,由一個工作執行緒進行後續處理。然後一個新的執行緒準備好在等待接受下一個連線。程式碼示例如下。

import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.*;
/**
 * <p>
 * Created with IntelliJ IDEA. 16/2/24 17:38
 * </p>
 * <p>
 * ClassName:ServerOnFutureForMultiClients
 * </p>
 * <p>
 * Description:基於Future實現的可以接受多客戶端併發的Java NIO2服務端實現
 * </P>
 *
 * @author Wang Xu
 * @version V1.0.0
 * @since V1.0.0
 * WebSite: http://codepub.cn
 * Licence: Apache v2 License
 */
@Log4j2
public class ServerOnFutureForMultiClients {
    static final int DEFAULT_PORT = 7777;
    static final String IP = "127.0.0.1";
    static ExecutorService taskExecutorService = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
    static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    public static void main(String[] args) {
        try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
            if (serverSocketChannel.isOpen()) {
                serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                serverSocketChannel.bind(new InetSocketAddress(IP, DEFAULT_PORT));
                log.info("Waiting for connections...");
                while (true) {
                    Future<AsynchronousSocketChannel> socketChannelFuture = serverSocketChannel.accept();
                    try {
                        final AsynchronousSocketChannel socketChannel = socketChannelFuture.get();
                        Callable<String> worker = new Callable<String>() {
                            @Override
                            public String call() throws Exception {
                                String s = socketChannel.getRemoteAddress().toString();
                                log.info("Incoming connection from : " + s);
                                while (socketChannel.read(buffer).get() != -1) {
                                    buffer.flip();
                                    ByteBuffer duplicate = buffer.duplicate();
                                    showMessage(duplicate);
                                    socketChannel.write(buffer).get();
                                    if (buffer.hasRemaining()) {
                                        buffer.compact();
                                    } else {
                                        buffer.clear();
                                    }
                                }
                                socketChannel.close();
                                log.info(s + " was successfully served!");
                                return s;
                            }
                        };
                        taskExecutorService.submit(worker);
                    } catch (InterruptedException | ExecutionException e) {
                        log.error(e);
                        taskExecutorService.shutdown();
                        while (!taskExecutorService.isTerminated()) {
                        }
                        break;
                    }
                }
            } else {
                log.warn("The asynchronous server-socket channel cannot be opened!");
            }
        } catch (IOException e) {
            log.error(e);
        }
    }
    protected static void showMessage(ByteBuffer buffer) {
        CharBuffer decode = Charset.defaultCharset().decode(buffer);
        log.info(decode.toString());
    }
}

Callback方式

方法回撥模式,即提交一個I/O操作請求,並且指定一個CompletionHandler。當非同步操作完成時,便會發一個通知,此時該CompletionHandler物件覆寫的方法將被呼叫,如果成功呼叫completed方法,如果失敗呼叫failed方法,首先看下Java API

public interface CompletionHandler<V,A> {
    /**
     * Invoked when an operation has completed.
     *
     * @param   result 操作結果
     *          The result of the I/O operation.
     * @param   attachment 提交請求時的引數,通常會封裝一個連線環境
     *          The object attached to the I/O operation when it was initiated.
     */
    void completed(V result, A attachment);
    /**
     * Invoked when an operation fails.
     *
     * @param   exc
     *          The exception to indicate why the I/O operation failed
     * @param   attachment
     *          The object attached to the I/O operation when it was initiated.
     */
    void failed(Throwable exc, A attachment);
}

AIO提供了四種類型的非同步通道以及不同的I/O操作可以接收一個CompletionHandler物件,分別是:

  • AsynchronousSocketChannel:connect,read,write
  • AsynchronousFileChannel:lock,read,write
  • AsynchronousServerSocketChannel:accept
  • AsynchronousDatagramChannel:read,write,send,receive

服務端示例程式碼如下

import lombok.extern.log4j.Log4j2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
/**
 * <p>
 * Created with IntelliJ IDEA. 16/2/24 20:14
 * </p>
 * <p>
 * ClassName:ServerOnCompletionHandler
 * </p>
 * <p>
 * Description:基於CompletionHandler實現NIO2的服務端
 * </P>
 *
 * @author Wang Xu
 * @version V1.0.0
 * @since V1.0.0
 * WebSite: http://codepub.cn
 * Licence: Apache v2 License
 */
@Log4j2
public class ServerOnCompletionHandler {
    static final int DEFAULT_PORT = 7777;
    static final String IP = "127.0.0.1";
    public static void main(String[] args) {
        try (final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
            if (serverSocketChannel.isOpen()) {
                serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
                serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                serverSocketChannel.bind(new InetSocketAddress(IP, DEFAULT_PORT));
                log.info("Waiting for connections...");
                serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
                    final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                    @Override
                    public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
                        //注意接收一個連線之後,緊接著可以接收下一個連線,所以必須再次呼叫accept方法
                        serverSocketChannel.accept(null, this);
                        try {
                            log.info("Incoming connection from : " + socketChannel.getRemoteAddress());
                            while (socketChannel.read(buffer).get() != -1) {
                                buffer.flip();
                                final ByteBuffer duplicate = buffer.duplicate();
                                final CharBuffer decode = Charset.
            
           

相關推薦

基於Java NIO2實現非同步阻塞訊息通訊框架

原文傳送門 基於Java NIO2實現的非同步非阻塞訊息通訊框架 前奏 AIO應用開發 Future方式 Callback方式 Reader/Writer方式實現 執行緒池和Group PendingExceptio

【Flask】Flask實現非同步阻塞請求功能

前言 最近做物聯網專案的時候需要搭建一個非同步非阻塞的HTTP伺服器,經過查詢資料,發現可以使用gevent包。 關於gevent Gevent 是一個 Python 併發網路庫,它使用了基於 libevent 事件迴圈的 greenlet 來提供一個高階

利用tornado使請求實現非同步阻塞

基本IO模型 網上搜了很多關於同步非同步,阻塞非阻塞的說法,理解還是不能很透徹,有必要買書看下。 參考:使用非同步 I/O 大大提高應用程式的效能 怎樣理解阻塞非阻塞與同步非同步的區別? 同步和非同步:主要關注訊息通訊機制(重點在B?)。 同步:A呼叫B,B處理直到獲得結

Java簡單實現Socket阻塞通訊

        用java實現socket C/S通訊很簡單,很多教科書上都有。但是這些通訊模型大都是阻塞式的,其弊端也很明顯:一方必須要接收的到對方的訊息後,才能編輯自己的訊息發出。同樣對方也要一直等待這條訊息收到後才能傳送新的訊息。用網路通訊的知識講,大概就是半雙工通訊

Guava ListenableFuture實現非同步阻塞呼叫

為了保證系統響應迅速,需要尋找一種方法能夠使調取介面能夠非同步執行,而Java正好提供了類似的方法,在java.util.concurrent中包含了Future相關的類,運用其中的一些類可以進行非同步計算,以減少主執行緒的等待時間。比如啟動一個main方

java NIO實現同步阻塞伺服器

server package net.smgui.util; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.Byt

Java入門系列-25-NIO(實現阻塞網路通訊)

還記得之前介紹NIO時對比傳統IO的一大特點嗎?就是NIO是非阻塞式的,這篇文章帶大家來看一下非阻塞的網路操作。 補充:以陣列的形式使用緩衝區 package testnio; import java.io.IOException; import java.io.RandomAccessFile; impo

java高併發系統之非同步阻塞

在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合資料拼裝模板、熱點統計、快取、下游功能降級開關、託底資料等等。其中聚合資料需要呼叫其它多個系統服務獲取資料、拼裝資料/模板然後返回給前端,聚合資料來源主要有依賴系統

聊聊java高併發系統之非同步阻塞

幾種呼叫方式 同步阻塞呼叫 即序列呼叫,響應時間為所有服務的響應時間總和; 半非同步(非同步Future) 執行緒池,非同步Future,使用場景:併發請求多服務,總耗時為最長響應時間;提升總響應時間,但是阻塞主請求執行緒,高併發時依然會造成執行緒數過多,CPU上下文切換; 全非同步(Cal

java同步,非同步阻塞阻塞的聯絡和區別

所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。而非同步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工作,依賴

Java網路程式設計——使用NIO實現阻塞Socket通訊

       除了普通的Socket與ServerSocket實現的阻塞式通訊外,java提供了非阻塞式通訊的NIO API。先看一下NIO的實現原理。        從圖中可以看出,伺服器上所有Channel(包括ServerSocketChannel和Socket

java NIO 實現阻塞socket通訊

java的nio為非阻塞式socket通訊提供瞭如下幾個類:           Selector : 它是SelectableChannel物件的多路複用器,所有希望採用非阻塞方式進行通訊的channel都應該註冊到Selector物件。可以通過呼叫此類的open()

基於Socket的多執行緒和非同步阻塞模式程式設計

      剛開始接觸socket的程式設計的時候,遇到了很多的問題,費了很大勁搞懂。其實往往都是一些比較基本的知識,但是都是很重要的,只要對其熟練的掌握後,相信對基於網路的程式設計會有很大的提高,呵呵。       就拿基於C/S結構的例子來說,我們先看看伺服器和客戶端的流

Thinking in Java--使用NIO實現阻塞Socket通訊

Java1.4提供了一種新的IO讀取方式,稱為NIO。NIO中使用了通道和緩衝器的概念,並且以塊的形式操作資料,這樣更接近作業系統IO操作的形式,提高了JavaIO的效率。NIO的核心類有兩個Channel和Buffer。但是其實除了提升了基本IO操作的效能外,

java網路程式設計(四)----非同步阻塞aio及proactor模型

(aio)NIO 2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步的套接字通道時真正的非同步非阻塞I/O,對應於UNIX網路程式設計中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現非

thrift java多執行緒阻塞同步/非同步呼叫例項

作者:呂桂強 郵箱:[email protected] 首先建立thrift檔案 namespace java thriftservice Hello{  string helloString(1:string para)} 執行thrift -ge

【轉】聊聊java高併發系統之非同步阻塞

在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合資料拼裝模板、熱點統計、快取、下游功能降級開關、託底資料等等。其中聚合資料需要呼叫其它多個系統服務獲取資料、拼裝資料/模板然後返回給前端,聚合資料來源主要有依賴系統/服務、快取、資料庫等;而系統之間

Web驗證碼圖片的生成-基於Java實現

submit esc page resp ioe 代碼 oge cnblogs pro 驗證碼圖片是由程序動態產生的,每次訪問的內容都是隨機的。那麽如何采用程序動態產生圖片,並能夠顯示在客戶端頁面中呢?原理很簡單,對於java而言,我們首先開發一個Servlet,這個Se

java簡單實現非同步佇列:使用生產者與消費者模型

package com.yunshouhu; import java.util.concurrent.*; //java簡單實現非同步佇列:使用生產者與消費者模型 public class MyAsynQueue { // http://www.importnew.com/22519.h

真正的 Tornado 非同步阻塞

原文出處https://hexiangyu.me/posts/15 其中 Tornado 的定義是 Web 框架和非同步網路庫,其中他具備有非同步非阻塞能力,能解決他兩個框架請求阻塞的問題,在需要併發能力時候就應該使用 Tornado。 但是在實際使用過程中很容易把 Tornado