1. 程式人生 > >java NIO模擬 Netty執行緒模型

java NIO模擬 Netty執行緒模型

兩個類,NioAcceptor,處理連線 單執行緒。NioReactor處理讀寫,多執行緒。

解釋放在程式碼中

NioAcceptor

package com.zwj.myNio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Set;

public class NioAcceptor {
    private volatile Selector selector;
    private final ServerSocketChannel serverSocketChannel;
    private String iP;
    private int port;
    // 對於以字元方式讀取和處理的資料必須要進行字符集編碼和解碼
    String encoding = System.getProperty("file.encoding");
    // 載入位元組編碼集
    Charset charse = Charset.forName(encoding);
    private final NioReactor[] nioReactors;
    private volatile int nextReactor;

    public NioAcceptor(String ip, int port, int threadN) {
        if (ip == null || ip == "") {
            throw new IllegalArgumentException();
        }
        this.iP = ip;
        if (port < 1) {
            throw new IllegalArgumentException();
        }
        this.port = port;

        if (threadN < 1) {
            throw new IllegalArgumentException();
        }
        //新建處理讀寫的執行緒池
        nioReactors = new NioReactor[threadN];
        for (int i = 0; i < threadN; i++) {
            nioReactors[i] = new NioReactor();
            nioReactors[i].start();
        }
        try {
            this.selector = Selector.open();
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            /** 設定TCP屬性 */
            serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
            serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);
            // backlog=100
            serverSocketChannel.bind(new InetSocketAddress(ip, port), 100);
            this.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("server start!");
        } catch (IOException e) {
            e.printStackTrace();
            throw new IllegalArgumentException();
        }
    }

    private NioReactor nextNioReactor() {

        int i = nextReactor++;
        if (i >= nioReactors.length) {
            i = nextReactor = 0;
        }
        return nioReactors[i];
    }

    public void run() {
        while (true) {
            try {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                if (!keys.isEmpty()) {
                    for (SelectionKey key : keys) {
                        keys.remove(key);
                        if (key.isValid() && key.isAcceptable()) {
                            SocketChannel ch = serverSocketChannel.accept();
                            ch.configureBlocking(false);
                            String tomessage = "welcome,this is server!";
                            try {
                                ch.write(charse.encode(tomessage));
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            //把這個通道交給Reactor處理,獲取NioReactor的方式非常原始就是輪詢這個陣列,從0開始那,到了末尾就又從0開始
                            NioReactor nioReactor = nextNioReactor();
                            nioReactor.postRegister(ch);
                        } else {
                            //acceptor 只接受 accept事件,如果有其他事件 把這個通道從selector 移除
                            key.cancel();
                        }
                    }
                }
            } catch (IOException e) {

                e.printStackTrace();
            }
        }

    }
}

NioReactor 內部的worker 是一個 runnable 不斷的select 檢查就緒的讀事件。同時有一儲存了該worker需要處理通道的佇列。詳細解釋看程式碼註釋

package com.zwj.myNio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class NioReactor {

    final String encoding = System.getProperty("file.encoding");
    final Charset charse = Charset.forName(encoding);
    private final Worker worker;

    public NioReactor() {
        //新建一個worker,worker 是一個 任務,實現 了Runnable介面他的功能就是不斷的輪訓註冊在他的selector 上的通過(channel)就緒讀事件
        this.worker = new Worker();
    }

    public void postRegister(SocketChannel socketChannel) {
        //把 通道放到worker 的register佇列上面,每次worker輪訓的時候會去檢查這個佇列,如果有新的通道,就把通道註冊到自己的selector
        this.worker.registerQueue.add(socketChannel);
        this.worker.selector.wakeup();
    }

    public void start() {
        //啟動一個執行緒來執行worker
        new Thread(worker).start();
    }

    private class Worker implements Runnable {

        private volatile Selector selector;
        private ConcurrentLinkedQueue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>();
        public Worker() {
            try {
                this.selector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        private void register(Selector selector) {
            if (registerQueue.isEmpty()) {
                return;
            }
            SocketChannel socketChannel = null;
            while ((socketChannel = registerQueue.poll()) != null) {
                try {
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        }
        @Override
        public void run() {
            while (true) {
                try {
                    register(selector);
                    selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    for (SelectionKey key : keys) {
                        keys.remove(key);
                        if (key.isValid() && key.isReadable()) {
                            SocketChannel rchannel = null;
                            try {
                                rchannel = (SocketChannel) key.channel();
                                ByteBuffer readByteBuffer = ByteBuffer.allocate(2048);
                                String content = "";
                                rchannel.read(readByteBuffer);
                                readByteBuffer.flip();
                                content += charse.decode(readByteBuffer);
                                //to do business
                                System.out.println("server rec:" + content);
                                String tomessage = "this is server!i have rec you mess";
                                rchannel.write(charse.encode(tomessage));
                            } catch (IOException e) {
                                if (rchannel != null) {
                                    key.cancel();
                                    rchannel.close();
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }


}

最後一個 客戶端程式 來連線 服務

package com.zwj.myNio;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

public class NIOClient {
    private static final int SIZE = 1024;
    private static NIOClient instance = new NIOClient();
    public String IP = "127.0.0.1";// 10.50.200.120
    public int CLIENT_PORT = 8090;// 4444 9666
    private SocketChannel channel;
    private Selector selector = null;

    String encoding = System.getProperty("file.encoding");
    Charset charset = Charset.forName(encoding);

    private NIOClient() {
    }

    public static NIOClient getInstance() {
        return instance;
    }

    public void send(String content) throws IOException {
        selector = Selector.open();
        channel = SocketChannel.open();
        // channel = SocketChannel.open(new InetSocketAddress(IP,CLIENT_PORT));
        InetSocketAddress remote = new InetSocketAddress(IP, CLIENT_PORT);
        channel.connect(remote);
        // 設定該sc以非阻塞的方式工作
        channel.configureBlocking(false);
        // 將SocketChannel物件註冊到指定的Selector
        // SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
        channel.register(selector, SelectionKey.OP_READ);//這裡註冊的是read讀,即從服務端讀資料過來
        // 啟動讀取伺服器資料端的執行緒
        new ClientThread().start();
        channel.write(charset.encode(content));
        // 建立鍵盤輸入流
        Scanner scan = new Scanner(System.in);//這裡向服務端傳送資料,同時啟動了一個鍵盤監聽器
        while (scan.hasNextLine()) {
            System.out.println("輸入資料:\n");
            // 讀取鍵盤的輸入
            String line = scan.nextLine();
            // 將鍵盤的內容輸出到SocketChanenel中
            channel.write(charset.encode(line));
        }
        scan.close();
    }

    /**
     * 從服務端讀入資料的執行緒

     *
     * @author 王俊偉 
[email protected]
* @date 2016年10月20日 下午9:59:11 */ private class ClientThread extends Thread { @Override public void run() { try { while (selector.select() > 0) { // 遍歷每個有可能的IO操作的Channel對銀行的SelectionKey for (SelectionKey sk : selector.selectedKeys()) { // 刪除正在處理的SelectionKey selector.selectedKeys().remove(sk); // 如果該SelectionKey對應的Channel中有可讀的資料 if (sk.isReadable()) { // 使用NIO讀取Channel中的資料 SocketChannel sc = (SocketChannel) sk.channel(); String content = ""; ByteBuffer bff = ByteBuffer.allocate(SIZE); while (sc.read(bff) > 0) { sc.read(bff); bff.flip(); content += charset.decode(bff); } // 列印讀取的內容 System.out.println("服務端返回資料:" + content); // 處理下一次讀 sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException io) { io.printStackTrace(); } } } /** * 接受服務端的資料 * * @param channel * @return * @throws Exception */ protected void receiveData(SocketChannel channel) throws Exception { ByteBuffer buffer = ByteBuffer.allocateDirect(1024); int count = 0; while ((count = channel.read(buffer)) != -1) { if (count == 0) { Thread.sleep(100); // 等等一下 continue; } // 轉到最開始 buffer.flip(); while (buffer.remaining() > 0) { System.out.print((char) buffer.get()); } buffer.clear(); } } public static void main(String[] args) { try { NIOClient nio = new NIOClient(); nio.send("test");//向服務端傳送資料 //nio.send("metrics:memory: swap: cpu: network i/o: disks i/o: tcp:\n"); } catch (IOException e) { e.printStackTrace(); } } }

服務啟動的main 程式

package com.zwj.myNio;

public class MyNio {


    public static void main(String[] args) {
        // write your code here
        NioAcceptor nioAcceptor = new NioAcceptor("127.0.0.1", 8090, 1);
        nioAcceptor.run();
    }
}