1. 程式人生 > >Netty - 模擬原始碼實現簡單Netty以及拓展應用

Netty - 模擬原始碼實現簡單Netty以及拓展應用

1.模擬Netty執行緒模型實現簡單網路通訊服務端

我們在學習一個開源的技術框架的時候儘可能地嘗試去看懂他的原始碼對自己理解這個框架以及應用都能帶來十分巨大的幫助,通過斷點、檢視呼叫棧等等都可以有效地幫助我們理解框架原始碼,這裡我們根據Netty框架的模型思想來模擬手寫一個簡單的網路通訊的服務端。

目錄結構:

在這裡插入圖片描述

AbstractNioSelector.java
package com.proto.nio;

import com.proto.nio.pool.NioSelectorRunnablePool;

import java.io.IOException;
import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** * 抽象selector執行緒類 * @author hzk * @date 2018/8/20 */ public abstract class AbstractNioSelector implements
Runnable{ /** * 執行緒池 */ private final Executor executor; /** * 選擇器 */ protected Selector selector; /** * 選擇器wakenup的狀態標記 */ protected final AtomicBoolean wakenUp = new AtomicBoolean(); /** * 任務佇列 */ private final Queue<Runnable>
taskQueue = new ConcurrentLinkedDeque<Runnable>(); /** * 執行緒名稱 */ private String threadName; /** * 執行緒管理物件 */ protected NioSelectorRunnablePool nioSelectorRunnablePool; public AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool nioSelectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.nioSelectorRunnablePool = nioSelectorRunnablePool; openSelector(); } /** * 獲取select並啟動執行緒 */ private void openSelector(){ try { this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("Failed to create a selector."); } executor.execute(this); } @Override public void run() { Thread.currentThread().setName(this.threadName); while (true){ try { wakenUp.set(false); select(this.selector); processTaskQueue(); process(this.selector); }catch (Exception e){ e.printStackTrace(); } } } /** * 註冊一個任務並激活selector * @param task */ protected final void registerTask(Runnable task){ taskQueue.add(task); Selector selector = this.selector; if(null != selector){ if(wakenUp.compareAndSet(false,true)){ selector.wakeup(); } }else{ taskQueue.remove(task); } } /** * 執行佇列裡的任務 */ private void processTaskQueue(){ for(;;){ final Runnable task = taskQueue.poll(); if(null == task){ break; } task.run(); } } /** * 獲取執行緒管理物件 * @return */ public NioSelectorRunnablePool getNioSelectorRunnablePool() { return nioSelectorRunnablePool; } /** * select抽象方法 * @param selector * @return * @throws IOException */ protected abstract int select(Selector selector) throws IOException; /** * select業務處理 * @param selector * @return * @throws IOException */ protected abstract void process(Selector selector) throws IOException; }
Boss.java
package com.proto.nio.pool;

import java.nio.channels.ServerSocketChannel;

/**
 * Boss介面
 * @author hzk
 * @date 2018/8/20
 */
public interface Boss {

    /**
     * 加入一個新的serverSocket
     * @param serverSocketChannel
     */
    public void regiserAcceptChannelTask(ServerSocketChannel serverSocketChannel);
}

NioServerBoss.java
package com.proto.nio;

import com.proto.nio.pool.Boss;
import com.proto.nio.pool.NioSelectorRunnablePool;
import com.proto.nio.pool.Worker;

import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Boss實現
 * @author hzk
 * @date 2018/8/20
 */
public class NioServerBoss extends AbstractNioSelector implements Boss{

    public NioServerBoss(Executor executor,String threadName, NioSelectorRunnablePool nioSelectorRunnablePool) {
        super(executor,threadName, nioSelectorRunnablePool);
    }

    @Override
    public void regiserAcceptChannelTask(final ServerSocketChannel serverSocketChannel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            @Override
            public void run() {
                try {
                    //註冊serverChannel到selector
                    serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select();
    }

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        if(selectionKeys.isEmpty()){
            return;
        }
        for (Iterator<SelectionKey> iterator = selectionKeys.iterator(); iterator.hasNext();){
            SelectionKey key = iterator.next();
            iterator.remove();
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            //新客戶端
            SocketChannel channel = server.accept();
            //設定為非阻塞
            channel.configureBlocking(false);
            //獲取一個worker
            Worker worker = getNioSelectorRunnablePool().nextWorkers();
            //註冊新客戶端接入任務
            worker.registerNewChannelTask(channel);
            System.out.println("New Client into...");
        }

    }
}

Worker.java
package com.proto.nio.pool;

import java.nio.channels.SocketChannel;

/**
 * Worker介面
 * @author hzk
 * @date 2018/8/20
 */
public interface Worker {

    /**
     * 加入一個新的客戶端會話
     * @param socketChannel
     */
    public void registerNewChannelTask(SocketChannel socketChannel);
}

NioServerWorker.java
package com.proto.nio;

import com.proto.nio.pool.NioSelectorRunnablePool;
import com.proto.nio.pool.Worker;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * worker實現
 * @author hzk
 * @date 2018/8/20
 */
public class NioServerWorker extends AbstractNioSelector implements Worker{

    public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool nioSelectorRunnablePool) {
        super(executor, threadName, nioSelectorRunnablePool);
    }

    /**
     * 加入一個新的socket客戶端
     * @param socketChannel
     */
    @Override
    public void registerNewChannelTask(final SocketChannel socketChannel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            @Override
            public void run() {
                try {
                    //將客戶端socketChannel註冊到selector中
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select(500);
    }

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        if(selectionKeys.isEmpty()){
            return;
        }
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while(iterator.hasNext()){
            SelectionKey key = iterator.next();
            //移除 防止重複處理
            iterator.remove();

            //得到事件發生的socket通道
            SocketChannel channel = (SocketChannel)key.channel();

            //資料總長度
            int ret = 0;
            boolean failure = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //讀取資料
            try {
                ret = channel.read(buffer);
                failure = false;
            } catch (IOException e) {
                e.printStackTrace();
            }
            //判斷連線是否斷開
            if(ret <= 0 || failure){
                key.cancel();
                System.out.println("Client disconnect...");
            }else{
                System.out.println("Receive msg:"+ new String(buffer.array()));
                //回寫資料給客戶端
                ByteBuffer wrap = ByteBuffer.wrap("Receive success!".getBytes());
                channel.write(wrap);
            }

        }
    }
}

NioSelectorRunnablePool.java
package com.proto.nio.pool;

import com.proto.nio.NioServerBoss;
import com.proto.nio.NioServerWorker;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * selector執行緒管理者
 * @author hzk
 * @date 2018/8/20
 */
public class NioSelectorRunnablePool {

    /**
     * boos執行緒陣列
     */
    private final AtomicInteger bossIndex = new AtomicInteger();
    private Boss[] bosses;

    /**
     * worker執行緒陣列
     */
    private final AtomicInteger workerIndex = new AtomicInteger();
    private Worker[] workers;

    public NioSelectorRunnablePool(Executor boss, Executor worker) {
        initBoss(boss, 1);
        initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
    }

    /**
     * 初始化boss執行緒
     * @param boss
     * @param count
     */
    private void initBoss(Executor boss, int count){
        this.bosses = new NioServerBoss[count];
        for (int i = 0 ;i < bosses.length;i++){
            bosses[i] = new NioServerBoss(boss,"Boss_Thread_"+(i+1),this);
        }
    }

    /**
     * 初始化worker執行緒
     * @param worker
     * @param count
     */
    private void initWorker(Executor worker, int count){
        this.workers = new NioServerWorker[count];
        for (int i = 0 ;i < workers.length;i++){
            workers[i] = new NioServerWorker(worker,"Worker_Thread_"+(i+1),this);
        }
    }

    /**
     * 獲取一個Boss
     * @return
     */
    public Boss nextBosses() {
        return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
    }

    /**
     * 獲取一個Worker
     * @return
     */
    public Worker nextWorkers() {
        return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
    }

}

ServerBootStrap.java
package com.proto.nio;

import com.proto.nio.pool.Boss;
import com.proto.nio.pool.NioSelectorRunnablePool;

import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;

/**
 * 服務啟動類
 * @author hzk
 * @date 2018/8/20
 */
public class ServerBootStrap {

    private NioSelectorRunnablePool nioSelectorRunnablePool;

    public ServerBootStrap(NioSelectorRunnablePool nioSelectorRunnablePool) {
        this.nioSelectorRunnablePool = nioSelectorRunnablePool;
    }

    public void bind(final SocketAddress socketAddress){
        try {
            //獲得一個serverSocket通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //設定非阻塞
            serverSocketChannel.configureBlocking(false);
            //繫結埠
            serverSocketChannel.bind(socketAddress);
            //獲取Boss程序
            Boss boss = nioSelectorRunnablePool.nextBosses();
            //向boss程序註冊通道
            boss.regiserAcceptChannelTask(serverSocketChannel);
        }catch (Exception e){
            e.printStackTrace();
        }
    }


}

Start.java
package com.proto.nio;

import com.proto.nio.pool.NioSelectorRunnablePool;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

/**
 * @author hzk
 * @date 2018/8/20
 */
public class Start {
    
    public static void main(String[] args){
        //初始化執行緒
        NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
        //獲取服務了
        ServerBootStrap serverBootStrap = new ServerBootStrap(nioSelectorRunnablePool);
        //繫結埠
        serverBootStrap.bind(new InetSocketAddress(8888));
        System.out.println("Start success...");
    }
}

2.Netty5基本使用

Netty5.x使用NioEventLoopGroup迴圈事件組替代了手動建立執行緒,在構建服務端和客戶端時與之前版本有一些差異,但是整體的構建流程大體一致。

pom.xml
<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
        </dependency>
ServerHandler.java

            
           

相關推薦

Netty - 模擬原始碼實現簡單Netty以及拓展應用

1.模擬Netty執行緒模型實現簡單網路通訊服務端 我們在學習一個開源的技術框架的時候儘可能地嘗試去看懂他的原始碼對自己理解這個框架以及應用都能帶來十分巨大的幫助,通過斷點、檢視呼叫棧等等都可以有效地幫助我們理解框架原始碼,這裡我們根據Netty框架的模型思想來模擬手寫一個簡單的

模擬webQQ實現簡單的聊天,基於WebSocket

什麼是WebSocket WebSocket 是 HTML5 開始提供的一種在單個 TCP 連線上進行全雙工通訊的協議。 ajax輪詢 輪詢(Polling)是一種CPU決策如何提供周邊裝置服務的方式,又稱“程控輸出入”(Programmed I/O)。輪詢法的概念是,由CPU定

jdk提供的阻塞佇列BlockingQueue下的五個實現簡單操作以及介紹

package cn.yarne.com.base.test; import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.Vector;

利用JAVA陣列模擬集合實現簡單的物件陣列的新增

public class Student {    Student[] stu;    int count;    String name;    int age;    public Student(){     stu=new Student[5];    }    public Student(Stri

tomcat-jdbc Pool 原始碼實現簡單分析

================================= 什麼是連線池? 池,不由自主的會想到水池。 小時候,我們都要去遠處的水井挑水,倒進家中的水池裡面。這樣,每次要用水時,直接從水池中「取」就行了。不用大老遠跑去水井打水。 資料庫連線池

RecyclerView 實現簡單瀑布流的應用

效果 實現程式碼 需要的jar包 com.android.support:recyclerview-v7:28.0.0 com.android.support:cardview-v7:28.0.0 activitymain的xml程式碼 <?xml version=

Netty實現簡單UDP服務器

rec nal req 參考 syn group out equal 文件 本文參考《Netty權威指南》 文件列表: ├── ChineseProverbClientHandler.java ├── ChineseProverbClient.java ├── Chine

Netty實現簡單HTTP代理伺服器

自上次使用Openresty+Lua+Nginx的來加速自己的網站,用上了比較時髦的技術,感覺算是讓自己的網站響應速度達到極限了,直到看到了Netty,公司就是打算用Netty來替代Openresty這一套,所以,自己也學了好久,琢磨了好一趟才知道怎麼用,現在用來寫一套HTTP代理伺服器吧,之後再測試一下效能

Netty實現簡單的Http伺服器

之前在upload伺服器的時候,由於tomcat效能瓶頸的的問題,qps無法達到要求,瞭解到Netty.io的高效能,覺得自己寫一個Http接受資料的伺服器來處理客戶段上報的資料,比較簡單,直接貼程式碼了: package com.bonree.browser

使用netty自行實現簡單的http服務端開發

瞭解http伺服器工作原理:http客戶端和伺服器端的互動步驟:1 client想server傳送http請求2 server端對http請求進行解析3 server端向client傳送http響應4 client對http響應進行解析使用netty自行實現http服務端開發

Mina、Netty、Twisted一起學(一):實現簡單的TCP伺服器

MINA、Netty、Twisted為什麼放在一起學習?首先,不妨先分別看一下它們官方網站對其的介紹:MINA:Apache MINA is a network application framework which helps users develop high perf

Netty 實現簡單RPC呼叫

RPC,即 Remote Procedure Call(遠端過程呼叫),說得通俗一點就是:呼叫遠端計算機上的服務,就像呼叫本地服務一樣。 RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具有良好的跨平臺

Netty的restful API 簡單實現和部署

1 BEGIN Netty 是一個基於NIO的客戶,伺服器端程式設計框架,使用Netty 可以確保你快速和簡單的開發出一個網路應用,例如實現了某種協議的客戶,服務端應用。Netty相當簡化和流線化了網路應用的程式設計開發過程,例如,TCP和UDP的socket

Netty 實現簡單的HTTP服務

超文字傳輸協議(HTTP,HyperText Transfer Protocol)是網際網路上應用最為廣泛的一種網路協議。 在後端開發中接觸HTTP協議的比較多,目前大部分都是基於Servlet容器實現的Http服務,往往有一些核心子系統對效能的要求非常高

Netty-主從Reactor多執行緒模式的原始碼實現

Netty--主從Reactor多執行緒模式的原始碼實現 總覽 EventLoopGroup到底是什麼? EventLoopGroup是一個儲存EventLoop的容器,同時他應該具備執行緒池的功能。 graph BT; EventLoopGroup --> EventExecutorGroup

netty 實現簡單的rpc呼叫

> **yls** *2020/5/23* ## netty 實現簡單rpc準備 1. 使用netty傳輸java bean物件,可以使用protobuf,也可以通過json轉化 2. 客戶端要將呼叫的介面名稱,方法名稱,引數列表的型別和值傳輸到服務端, 可以用動態代理 3. 服務端要對介面和實現類

COM編程_第一講_深入COM框架以及實現簡單的COM

dir 微軟 高級 原理 padding 不知道 簡單的 out interface 一丶我們要理解COM是什麽(為什麽理解) 現在很多人會用com(也就是ALT)但是不知道原理,如果改一點東西,那麽整體的框架重來,因為你不懂改哪裏,如果懂了,那麽遇到問題,那麽就會知道我要

netty服務端實現心跳超時的主動拆鏈

use ctx out apt ket cond else pipeline ali 一、服務器啟動示例: public class MySocketServer { protected static Logger logger = LoggerFactory.g

在VR中模擬用鼠標操作電腦並實現簡單畫圖的小程序

npr 事件 line 屏幕 reset relative max using false (圖沒有錄好,明天換一下) 一、概述 1.實現的基本操作是:   1)用手柄抓住黃色的方塊代表手抓住鼠標。   2)通過移動手柄模擬鼠標移動,電腦屏幕上的光標跟著移動。   3)當

實現簡單的PHP接口,以及使用js/jquery ajax技術調用此接口

gpo col php接口 class cal ajax技術 enc font blog 主要介紹下如何編寫簡單的php接口,以及使用js/jquery的ajax技術調用此接口。 Php接口文件(check.php): <?php $jsonp_supporter