1. 程式人生 > >編寫一個簡易的Java NIO Reactor庫

編寫一個簡易的Java NIO Reactor庫

開源地址

原始碼設計

接收器Acceptor

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>[email protected]</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This Acceptor provides a NIO mode to accept client sockets.</p>
 */
public final class Acceptor extends Thread { private static final Logger LOGGER = LoggerFactory .getLogger(Acceptor.class); private final int port; private final Selector selector; private final ServerSocketChannel serverChannel; private long acceptCount; private
static final AcceptIdGenerator IdGenerator = new AcceptIdGenerator(); private ReactorPool reactorPool; public Acceptor(ReactorPool reactorPool, String name, String bindIp, int port) throws IOException { super.setName(name); this.port = port; this.selector = Selector.open(); this
.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024); this.serverChannel.bind(new InetSocketAddress(bindIp, port), 100); this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); this.reactorPool = reactorPool; } public int getPort() { return port; } public long getAcceptCount() { return acceptCount; } @Override public void run() { final Selector selector = this.selector; for (;;) { ++acceptCount; try { selector.select(1000L); Set<SelectionKey> keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { accept(); } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(getName(), e); } } } /** * Accept client sockets. */ private void accept() { SocketChannel channel = null; try { channel = serverChannel.accept(); channel.configureBlocking(false); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_RCVBUF, 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 1024); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); reactorPool.getNextReactor().postRegister( new FrontendConnection(channel, IdGenerator.getId())); } catch (Throwable e) { closeChannel(channel); LOGGER.warn(getName(), e); } } /** * Close a channel. * * @param channel */ private static void closeChannel(SocketChannel channel) { if (channel == null) { return; } Socket socket = channel.socket(); if (socket != null) { try { socket.close(); LOGGER.info("channel close."); } catch (IOException e) { LOGGER.warn("IOException happens when closing socket : ", e); } } try { channel.close(); } catch (IOException e) { LOGGER.warn("IOException happens when closing channel : ", e); } } /** * ID Generator. */ private static class AcceptIdGenerator { private static final long MAX_VALUE = 0xffffffffL; private long acceptId = 0L; private final Object lock = new Object(); private long getId() { synchronized (lock) { if (acceptId >= MAX_VALUE) { acceptId = 0L; } return ++acceptId; } } } }

Reactor類

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>[email protected]</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Reactor reacts all sockets.</p>
 */
public final class Reactor extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
    private final String name;
    private final Selector selector;
    private final ConcurrentLinkedQueue<FrontendConnection> queue;
    private long doCount;
    private Handler handler;

    public Reactor(String name, Handler handler) throws IOException {
        this.name = name;
        this.selector = Selector.open();
        this.queue = new ConcurrentLinkedQueue<FrontendConnection>();
        this.handler = handler;
    }

    final void postRegister(FrontendConnection frontendConnection) {
        queue.offer(frontendConnection);
        this.selector.wakeup();
    }

    @Override
    public void run() {
        final Selector selector = this.selector;
        Set<SelectionKey> keys = null;
        for (;;) {
            ++doCount;
            try {
                selector.select(500L);
                register(selector);
                keys = selector.selectedKeys();
                for (SelectionKey key : keys) {
                    FrontendConnection connection = null;
                    Object attach = key.attachment();
                    if (attach != null && key.isValid()) {
                        connection = (FrontendConnection) attach;
                        if (key.isReadable()) {
                            try {
                                connection.read();
                                handler.handle(connection);
                            } catch (IOException e) {
                                connection.close();
                                LOGGER.warn("IOException happens : ", e);
                                continue;
                            } catch (Throwable e) {
                                LOGGER.warn("Throwable happens : ", e);
                                continue;
                            }
                        }
                        if (key.isValid() && key.isWritable()) {
                            connection.write();
                        }
                    } else {
                        key.cancel();
                    }
                }
            } catch (Throwable e) {
                LOGGER.warn("exception happens selecting : ", e);
            } finally {
                if (keys != null) {
                    keys.clear();
                }
            }
        }
    }

    private void register(Selector selector) {
        FrontendConnection c = null;
        if (queue.isEmpty()) {
            return;
        }
        while ((c = queue.poll()) != null) {
            try {
                c.register(selector);
            } catch (Throwable e) {
                LOGGER.warn("ClosedChannelException happens : ", e);
            }
        }
    }

    final Queue<FrontendConnection> getRegisterQueue() {
        return queue;
    }

    final long getReactCount() {
        return doCount;
    }

}

Reactor池

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>[email protected]</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Reactor pool. Socket connections are polling to the reactor of this pool. </p>
 */
public class ReactorPool {
    private final Reactor[] reactors;
    private volatile int nextReactor;
    private String name = "reactor";

    public ReactorPool(int poolSize, Handler handler) throws IOException {
        reactors = new Reactor[poolSize];
        for (int i = 0; i < poolSize; i++) {
            Reactor reactor = new Reactor(name + "-" + i,handler);
            reactors[i] = reactor;
            reactor.start();
        }
    }

    public Reactor getNextReactor() {
        if (++nextReactor == reactors.length) {
            nextReactor = 0;
        }
        return reactors[nextReactor];
    }
}

前端連線抽象

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>[email protected]</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This is a abstraction of frontend.</p>
 */
public class FrontendConnection {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(FrontendConnection.class);
    private long id;
    private SocketChannel channel;
    private SelectionKey selectionKey;
    private ByteBuffer readBuffer;
    private static int BYFFERSIZE = 1024;
    protected ConcurrentLinkedQueue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue<ByteBuffer>();

    public FrontendConnection(SocketChannel channel, long id) {
        this.id = id;
        this.channel = channel;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public long getId() {
        return id;
    }

    public void read() throws IOException {
        readBuffer = ByteBuffer.allocate(BYFFERSIZE);
        channel.read(readBuffer);
    }

    public void close() throws IOException {
        channel.close();
    }

    public void write() throws IOException {
        ByteBuffer buffer;
        while ((buffer = writeQueue.poll()) != null) {
            buffer.flip();
            while (buffer.hasRemaining()) {
                int len = channel.write(buffer);
                if (len < 0) {
                    throw new EOFException();
                }
                if (len == 0) {
                    selectionKey.interestOps(selectionKey.interestOps()
                            | SelectionKey.OP_WRITE);
                    selectionKey.selector().wakeup();
                    break;
                }
            }
        }
        selectionKey.interestOps(selectionKey.interestOps()
                & ~SelectionKey.OP_WRITE);
    }

    public ByteBuffer getReadBuffer() {
        return readBuffer;
    }

    public ConcurrentLinkedQueue<ByteBuffer> getWriteQueue() {
        return writeQueue;
    }

    public void register(Selector selector) throws Throwable {
        selectionKey = channel.register(selector, SelectionKey.OP_READ, this);
    }

}

處理

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>[email protected]</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This Handler will be call when there is a data having be ready.</p>
 */
public interface Handler {

    public void handle(FrontendConnection connection);

}

定義自己的處理

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>[email protected]</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Demo.</p>
 */
public class MyHandler implements Handler {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(MyHandler.class);
    private long readSize;

    /**
     * The logic to deal with the received data.
     *  
     * It means that reactor will trigger this function once the data is received.
     */
    public void handle(FrontendConnection connection) {
        Buffer buff = connection.getReadBuffer();
        readSize = +readSize + buff.position();
        LOGGER.info(connection.getId() + " connection has receive " + readSize);
        if (readSize % 5 == 0) {
            ByteBuffer sendBuffer = ByteBuffer.allocate(10);;
            sendBuffer.wrap("hello".getBytes());
            connection.getWriteQueue().add(sendBuffer);
            try {
                connection.write();
            } catch (IOException e) {
                LOGGER.warn("IOException", e);
            }
        }
    }

}

啟動

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>[email protected]</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>The reactor bootstrap.</p>
 */
public class Bootstrap {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(Bootstrap.class);
    private static String acceptorName = "acceptor-thread";
    private static String host = "localhost";
    private static int port = 6789;

    public static void main(String[] args) {
        try {
            LOGGER.info("starting up ......");
            Handler handler = new MyHandler();
            ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
            new Acceptor(reactorPool, acceptorName, host, port).start();
            LOGGER.info("started up successfully.");
            while (true) {
                Thread.sleep(300 * 1000);
            }
        } catch (Throwable e) {
            LOGGER.error(" launch error", e);
            System.exit(-1);
        }
    }
}

net-reactor

it’s a simple and easy net framework with nio mode written by java

how-to

just simply like:

Handler handler = new MyHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
new Acceptor(reactorPool, acceptorName, host, port).start();

========廣告時間========

=========================

相關推薦

編寫一個簡易Java NIO Reactor

開源地址 原始碼設計 接收器Acceptor /** * * @author seaboat * @date 2016-08-25 * @version 1.0 * <pre><b>email: <

Netty學習:搭建一個簡單的Netty服務(JAVA NIO的非同步通訊框架)

http://wosyingjun.iteye.com/blog/2303296 Netty學習:搭建一個簡單的Netty服務 Netty 是一個基於 JAVA NIO 類庫的非同步通訊框架,它的架構特點是:非同步非阻塞、基於事件驅動、高效能、高可靠性和高可定製性。換句

基於OpenGL編寫一個簡易的2D渲染框架-04 繪制圖片

著色器 drawtext 結構 渲染 images ron renderer make 制圖 閱讀文章前需要了解的知識,紋理:https://learnopengl-cn.github.io/01%20Getting%20started/06%20Textures/  

基於OpenGL編寫一個簡易的2D渲染框架-05 渲染文本

new 坐標 false 證明 ont 獲取 simple 了解 param 閱讀文章前需要了解的知識:文本渲染 https://learnopengl-cn.github.io/06%20In%20Practice/02%20Text%20Rendering/ 簡要步

Angular2-編寫一個簡易的組件

span 裝飾器 cto 理解 log 完整 col 編寫 {}   Angular2組件可以這麽理解:編寫一個類,然後在類的上面用組件裝飾器裝飾一下,這個類就成組件了。   所以編寫組件分兩步:1)編寫類;2)編寫裝飾器   1)編寫類: export class Si

基於OpenGL編寫一個簡易的2D渲染框架-13 使用例子

tom 進行 prim demo custom 第一個 manager sets mar 這是重構渲染器的最後一部分了,將會給出一個 demo,測試模板測試、裁剪測試、半透明排序等等: 上圖是本次 demo 的效果圖,中間的綠色圖形展現的是模板測試。 模板測試

【QT】編寫一個簡易的串列埠軟體

簡述 QT學了一點發現學不進去,索性看看能不能直接擼個程式,於是就有了這個簡易的串列埠軟體… 分析需求 這是XCOM串列埠收發軟體,以此為例 目的很明確: 串列埠列表要能顯示所有已經接上PC上的COM

通過編寫一個簡單的日誌類來加深瞭解C#的檔案訪問控制

在程式的開發除錯過程及釋出執行後的狀態監控中,日誌都有著極其重要的分量,通過在關鍵邏輯節點將關鍵資料記錄到日誌檔案當中能幫助我們儘快找到程式問題所在。網上有不少專業成熟的日誌元件可用,比如log4net和nlog等,由其專業及受歡迎程度可見日誌在一個程式中的重要性。 我只用過log4net,而在用log4n

編寫一個簡易的 HTTP 伺服器程式

好久沒輸出了,知識還是要寫下總結才能讓思路更加清晰。最近在學習計算機網路相關的知識,來聊聊如何編寫一個建議的HTTP伺服器。 HTTP 伺服器 HTTP伺服器,就是一個執行在主機上的程式。程式啟動了之後,會一直在等待其他所有客戶端的請求,接收到請求之後,處理請求,然

爬蟲實戰:一個簡易 Java 爬蟲程式的實現

前面,我們分別介紹了爬蟲程式的 3 個部分:原始碼下載、內容解析、頁面連結提取。現在,讓我們把這些部分串起來,構建一個完整的簡易爬蟲程式。 爬蟲流程 構建程式前,我們首先需要了解爬蟲的具體流程。 一個簡易的爬蟲程式,具備以下流程: 若

Socket編寫一個簡易的聊天室(相關知識點的總結)

初次接觸到Socket的時候,還以為是網路程式設計的部分。學完後才發現,他也是Java中的一個類。只是它和TCP協議掛鉤了。在用Socket的時候要考慮到網路和協議的問題,以及每個應用程式相對應的埠,當伺服器或者客服端通過網路接受到資訊的時候,會先根據IP地址找到相應的位置

Linux系統下編寫一個hello.java

 在linux下面的編輯器我使用的vi編輯器。 1.vi hello.java 好這樣系統會自動生成一個hello.java 檔案 。 2.建立程式 public class hello {   public static void main(String[]args)  

Java NIOSelector機制解析(上)

在使用Java進行相關網路程式的的設計時,出身C/C++的人,首先想到的框架就是多路複用,想到多路複用,Unix/Linux下馬上就能讓從想到select, poll, epoll系統呼叫。於是,在看到Java的NIO中的Selector類時必然會倍感親切。稍加查閱一下SDK手冊以及相關例程,不一會兒,一個多

使用java nio 編寫簡易聊天室

伺服器端:相當於是一個接收客戶端訊息的分發器,為了簡單,直接在接收到客戶端的訊息後,                  直接傳送給所有的客戶端 package chatroom.chatser

泛型程式設計學習,編寫一個類似STL中的簡易list的迭代器(iterator)

泛型程式設計學習,編寫一個類似STL庫中的簡易list的迭代器(iterator) 前言 近期在研究stl原始碼及stl裡各種實現的細節,初學入門免不了模仿,以下便寫一次自己的簡單的list容器的迭代器。 首先,在開始編寫List的迭代器的時候我們首先應該瞭解我們要寫的List和其迭

Java核心類-IO-NIO概述

字符 java 傳統 ont syn 概述 pan spa 數據 NIO:New IO 從JDK1.4開始提出的,新的IO,可以把一塊磁盤文件映射到內存中,我們再去讀取內存中的數據。 存放在java.nio包中 Java NIO(New IO)是Java1.4版本開始引入的

reactor模式與java nio

time handlers write syn linu pipe accept 事件處理 schmidt ?? Reactor是由Schmidt, Douglas C提出的一種模式,在高並發server實現中廣泛採用。改模式採用事件驅動方式,當事件出現時,後調用對應的

如何用Java編寫一個簡單的服務器和客戶機

exce 解決 對賬 location exceptio acc 明顯 隊列 客戶 今天我要向大家介紹的是自己編寫的一個比較簡單的服務器和客戶機程序,註意一下哦,比較簡單。好了,閑話休提,砸門直入主題。 小編先從客戶機和服務器的模型開始講解。

Java:Object類的equals()方法 如何編寫一個完美的equals()方法

urn day lan 匹配 另有 net 現在 isn tar 一 代碼實例: package freewill.objectequals; /** * @author freewill * @see Core Java page161 * @desc getClas

java實現一個簡易編譯器1-詞法解析入門

new 概念 自加 我們 sta 數字 獲得 () 操作系統 本文對應代碼下載地址為: http://download.csdn.net/detail/tyler_download/9435103 視頻地址: http://v.youku.com/v_show/id_XMT