1. 程式人生 > >Day15.高效能RPC設計 學習筆記3 - Netty

Day15.高效能RPC設計 學習筆記3 - Netty

一、Netty引言

基於NIO一款非同步通訊框架,因為在使用上相比較Mina較為簡單,開發門檻低導致了Netty在網際網路開發中受到絕大多數商用專案成功驗證,導致了Netty成為NIO開發的首選框架。

“快速”和“簡單”並不用產生維護性或效能上的問題。Netty 是一個吸收了多種協議的實現經驗,這些協議包括 FTP,SMTP,HTTP,各種二進位制,文字協議,並經過相當精心設計的專案,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的效能,穩定性和伸縮性。

在設計上: 針對多種傳輸型別的統一介面 - 阻塞和非阻塞; 簡單但更強大的執行緒模型;真正的無連線的資料報套接字支援;連結邏輯支援複用;
在效能上:比核心 Java API 更好的吞吐量,較低的延時;資源消耗更少,這個得益於共享池和重用;減少記憶體拷貝
在健壯性上:消除由於慢,快,或過載連線產生的 OutOfMemoryError;消除經常發現在 NIO 在高速網路中的應用中的不公平的讀/寫比
在安全上:完整的 SSL / TLS 和 StartTLS 的支援且已得到大量商業應用的真實驗證, 如: Hadoop 專案的 Avro (RPC 框架)、 Dubbo、 Dubbox等RPC框架

Netty架構圖【瞭解】

二、例項

  1. 匯入依賴
<dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>5.0.0.Alpha2</version>
</dependency>
  1. Netty的服務端開發
public class NettyServer {
    public static void main(String[] args) throws
InterruptedException { //①、啟動服務引導 框架啟動引導類,遮蔽網路通訊配置資訊 ServerBootstrap sbt= new ServerBootstrap(); //②、建立請求轉發,響應執行緒池 NioEventLoopGroup master = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); //③、關聯執行緒池組 sbt.group(master,worker)
; //④、設定服務端實現類 sbt.channel(NioServerSocketChannel.class); //⑤、配置服務端RPC通訊管道 【關注點】 sbt.childHandler(new ServerChannelInitializer()); //⑥、設定伺服器的埠並且啟動服務 System.out.println("我在9999監聽..."); ChannelFuture f = sbt.bind(9999).sync(); //⑦、關閉通訊管道 f.channel().closeFuture().sync(); //⑧、釋放資源 master.shutdownGracefully(); worker.shutdownGracefully(); } }
  1. Netty的客戶端開發(Server端改動下)
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //①、啟動服務引導 框架啟動引導類,遮蔽網路通訊配置資訊
        Bootstrap bt= new Bootstrap();
        //②、建立請求轉發,響應執行緒池
        NioEventLoopGroup worker = new NioEventLoopGroup();
        //③、關聯執行緒池組
        bt.group(worker);
        //④、設定服務端實現類
        bt.channel(NioSocketChannel.class);
        //⑤、配置客戶端RPC通訊管道 【關注點】
        bt.handler(new ClientChannelInitializer());
        //⑥、設定伺服器的埠並且啟動服務
        ChannelFuture f = bt.connect("127.0.0.1",9999).sync();
        //⑦、關閉通訊管道
        f.channel().closeFuture().sync();
        //⑧、釋放資源
        worker.shutdownGracefully();
    }
}
  • 總結下API記憶技巧:
    第一步、建立服務引導(服務端ServerBootstrap與客戶端Bootstrap);
    第二步、確定網路程式設計執行緒(服務端兩個,客戶端一個);
    第三步、關聯起來;
    第四步、實現類(服務端NioServerSocketChannel和客戶端NioSocketChannel);
    第五步、配置自定義的RPC通訊管道;
    第六步、服務端啟動服務,客戶端多設定ip啟動服務;
    第七步、關閉通訊管道
    第八步、釋放資源

三、Netty的管道淺析 —— “學Netty,就是學管道”

netty管道圖

在這裡插入圖片描述

  • 解釋:
    管道:訊息交換的介質
    環:加功能、不同環只處理固定方向的stream
    紅色:輸入upstream、編碼器、將物件轉成位元組流、先編碼,再編幀
    橙色:輸出downstream、解碼器、將位元組流轉成物件、先解幀,再解碼

編碼 - 將物件變成位元組、編幀 - 切分物件,方便解幀、解幀 - 固定規則拆分、解碼 - 位元組變成物件

管道末端:掛載最終處理者,作用收發物件
管道前端:掛載編解碼器
所以最終關注的,需要書寫的是,最終處理者和編解碼器

四、開發管道

  1. 初始化管道末端 xxxChannelInitializer
  • ServerChannelInitializer
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //通訊管道
        ChannelPipeline pipeline = ch.pipeline();
        //掛載最終處理者
        pipeline.addLast(new ServerChannelHandlerAdapter());
    }
}
  • ClientChannelInitializer
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //通訊管道
        ChannelPipeline pipeline = ch.pipeline();
        //掛載最終處理者
        pipeline.addLast(new ClientChannelHandlerAdapter());
    }
}
  1. 掛載最終處理者 xxxChannelHandlerAdapter
    Server:
    ​ channelRead:被動接收發訊息
    ​ exceptionCaught:異常捕獲
    Client:
    ​ channelActive:傳送訊息
    ​ channelRead:接收訊息
    ​ exceptionCaught:異常捕獲
  • ServerChannelHandlerAdapter 建立服務端最終處理者
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
    /**
     * 收發訊息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ChannelFuture channelFuture = ctx.writeAndFlush(msg);
        //關閉通道
        channelFuture.addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 異常捕獲
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println(cause.getMessage());
    }
}
  • ClientChannelHandlerAdapter 建立客戶端最終處理者
public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
    /**
     * 主動傳送訊息
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String msg="你好,我伺服器!!!!";
        ByteBuf buf=ctx.alloc().buffer();
        buf.writeBytes(msg.getBytes());
        ctx.writeAndFlush(buf);
    }

    /**
     * 被動收發訊息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf= (ByteBuf) msg;
        System.out.println("收到:"+buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 異常捕獲
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println(cause.getMessage());
    }
}

五、位元組流ByteBuf基本使用

  1. ByteBuf
    ByteBuf區別於ByteBuffer不同。因為ByteBuf是一個可邊長的位元組陣列是Netty提供專門用於操作位元組工具類。
    該物件通過readIndex/writeIndex控制ByteBuf。在初始情況下readIndexwriteIndex都等於0;當用戶寫資料的時候writeIndex自動增長位元組數,可讀的區域就等於writeIndex-readIndex
/**
* 或者其他建立方式 new PooledByteBufAllocator().buffer();
* new UnpooledByteBufAllocator(true).buffer();
*/
ByteBuf buf= Unpooled.buffer(5);// r=0,w=0,c=5(可以自動擴容)
buf.writeByte((byte)'a');
buf.writeByte((byte)'b');
buf.writeByte((byte)'c');    // r=0,w=3,c=5
byte v1 = buf.readByte();
byte v2 = buf.readByte();   // r=2,w=3,c=5
//readableBytes() 剩餘可讀位元組
int i = buf.readableBytes();// w-r=1
//discardReadBytes() 清除已讀過資料,即修正readIndex和writeIndex
buf.discardReadBytes();     // w-=r,r=0 ,c=5

相比於 ByteBuffer操作簡單些。
注意ByteBuf是Netty提供緩衝區,不能夠給NIO使用,僅僅簡化對緩衝區的操作。省略flip/clear方法。Netty管道預設只支援傳送ByteBufFileRegion都代表位元組資料,一般在RPC系統使用ByteBuf傳送基礎資料(java物件);如果使用者使用Netty開發檔案傳輸系統。使用者可以直接將一個檔案拆分成多個FileRegion物件.

  1. FileRegion

將一個檔案切分成多個區間,實現對檔案隨機讀取。(可以實現斷點續傳,並行拷貝)

FileChannel fi=new FileInputStream("xxx檔案A").getChannel();
FileChannel fo=new FileOutputStream("xxx檔案B",true).getChannel();//SocketChannel
//讀取xxx檔案A從0位置開始讀,讀取100個位元組
FileRegion region= new DefaultFileRegion(fi,0,100);
//從當前Region的0位置,拷貝100個位元組資料到Xxx檔案B中
region.transferTo(fo,0);

FileRegion在網路間傳遞檔案系統。綜上所述Netty的通道只支援傳送以上兩種物件型別,如果使用者希望傳送自定義型別就必須嘗試將傳送的資料轉換為ByteBuf或者FileRegion.

六、如何捕獲Netty在傳輸過程中出現的異常資訊?【面試題】

//傳送訊息
ChannelFuture channelFuture = ctx.writeAndFlush(cmd);
//新增序列化異常捕獲
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
//出錯切斷連線
channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

物件編碼器,作用於輸出,使得Netty支援傳輸任意Java物件。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.commons.lang3.SerializationUtils;

import java.io.Serializable;
import java.util.List;

public class ObjectEncoder extends MessageToMessageEncoder<Object> {
    /**
     * 
     * @param ctx
     * @param msg 需要編碼的物件
     * @param out 編碼資料幀
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
        ByteBuf buf=ctx.alloc().buffer();
        byte[] bytes= SerializationUtils.serialize((Serializable) msg);
        buf.writeBytes(bytes);
        out.add(buf);//生成資料幀
    }
}

物件解碼,將任意的收到ByteBuf轉換為Object

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.apache.commons.lang3.SerializationUtils;

import java.util.List;

public class ObjectDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        byte[] bytes=new byte[msg.readableBytes()];
        msg.readBytes(bytes);
        Object obj = SerializationUtils.deserialize(bytes);
        out.add(obj);
    }
}

七、完整版本Netty服務端【重點】

  1. HostAndPort 封裝ip和埠
public class HostAndPort {
    private String host;
    private int port;

    public HostAndPort(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public String getHost() {
        return host;
    }
        this.host = host;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
  1. MethodInvokeMeta 測試用方法物件
public class MethodInvokeMeta implements Serializable {
    private String method;//方法名
    private Class<?>[] parameterTypes;//引數型別
    private Object[] agrs;//引數
    private String targetClass;//目標類資訊

    public MethodInvokeMeta(String method, Class<?>[] parameterTypes, Object[] agrs, String targetClass) {
        this.method = method;
        this.parameterTypes = parameterTypes;
        this.agrs = agrs;
        this.targetClass = targetClass;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getAgrs() {
        return agrs;
    }

    public void setAgrs(Object[] agrs) {
        this.agrs = agrs;
    }

    public String getTargetClass() {
        return targetClass;
    }

    public void setTargetClass(String targetClass) {
        this.targetClass = targetClass;
    }

    @Override
    public String toString() {
        return "MethodInvokeMeta{" +
                "method='" + method + '\'' +
                ", parameterTypes=" + Arrays.toString(parameterTypes) +
                ", agrs=" + Arrays.toString(agrs) +
                ", targetClass='" + targetClass + '\'' +
                '}';
    }
}
  1. NettyServer 簡潔方式NettyServer
    幀頭圖圖
public class NettyServer {
    //1.啟動服務引導  框架啟動引導類,遮蔽網路通訊配置資訊
    private ServerBootstrap sbt = new ServerBootstrap();
    //2.建立請求轉發、響應執行緒池
    private EventLoopGroup boss = new NioEventLoopGroup();
    private EventLoopGroup worker = new NioEventLoopGroup();

    private int port;

    public NettyServer(int port) {
        this.port = port;
        //3.關聯執行緒池組
        sbt.group(boss, worker);
        //4.設定服務端實現類
        sbt.channel(NioServerSocketChannel.class);
    }

    public static void main(String[] args) throws InterruptedException {
        NettyServer nettyServer = new NettyServer(9999);
        try {
            nettyServer.start();
        } finally {
            nettyServer.close();
        }
    }

    public void start() throws InterruptedException {
        //5.配置服務端RPC通訊管道 - 需要關注的東西
        sbt.childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
//新增資料幀解碼
/**
* @param maxFrameLength  幀的最大長度 '65535'
* @param lengthFieldOffset length欄位偏移的地址 '0'
* @param lengthFieldLength length欄位所佔的位元組長 '2'
* @param lengthAdjustment 修改幀資料長度欄位中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,若為負數,則說明要推後多少個欄位 '0'
* @param initialBytesToStrip 解析時候跳過多少個長度 '2'
* @param failFast 為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false,讀取完整個幀再報異 '有不寫這個引數的構造'
*/
                pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                //新增物件解碼器
                pipeline.addLast(new ObjectDecoder());
//新增資料幀編碼器
/**
* 兩個位元組長度的幀頭協議
*/
                pipeline.addLast(new LengthFieldPrepender(2));
                //新增物件編碼器
                pipeline.addLast(new ObjectEncoder());
                //新增最終處理者
                pipeline.addLast(new ChannelHandlerAdapter() {
                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        System.err.println(cause.getMessage());
                    }

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        System.out.println("伺服器收到:" + msg);
                        //收訊息 ChannelFuture非同步操作的結果
                        ChannelFuture channelFuture = ctx.writeAndFlush(msg);

                        //關閉通道,ps測試多物件時關閉
                        //channelFuture.addListener(ChannelFutureListener.CLOSE);
                        channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
            
           

相關推薦

Day15.高效能RPC設計 學習筆記3 - Netty

一、Netty引言 基於NIO一款非同步通訊框架,因為在使用上相比較Mina較為簡單,開發門檻低導致了Netty在網際網路開發中受到絕大多數商用專案成功驗證,導致了Netty成為NIO開發的首選框架。 “快速”和“簡單”並不用產生維護性或效能上的問題。Netty 是一個吸收了多種協議

Day16.高效能RPC設計 學習筆記4 - Zookeeper(轉載)

Zookeeper ZooKeeper 是一個為分散式應用所設計的分佈的、開源的協調服務。可以解決分散式應用中出現常規問題: 同步配置、選舉、分散式鎖、服務命名分組,記住這些問題雖然zookeeper可以幫助使用者解決,並不意味著使用者不需要寫程式碼。使用者如果想使用zookeepe

Day14.高效能RPC設計 學習筆記2

一、通道選擇器 通道註冊:需要使用Selector管理通道,然後將就緒的通道封裝成SelectionKey物件。 設定通道為非阻塞 ServerSocketChannel/SocketChannel#configureBlocking(false) 註冊通道Ser

Day13.高效能RPC設計 學習筆記1

一、引言 系統架構演變 隨著網際網路的發展,網站應用的規模不斷擴大,常規的垂直應用(MVC)架構已無法應對,分散式服務架構以及流動計算架構(伸縮性)勢在必行,亟需一個治理系統確保架構有條不紊的演進。 單一架構:例如早期servlet/jsp - ORM(物件關

系統分析與設計--學習筆記3(用例建模)

一.使用故事板(Storyboard)技術,建模網上訂機票、車票,或訂旅店的過程。 故事前提:你是註冊使用者,並已登陸。 故事起點:發起目標查詢 故事終點:確認訂單(不包含支付) 描述手法:參考 “建模練習” 文件 用“藝龍網”的網上訂酒店來模仿建模:(更多資訊在藝龍網

設計模式:學習筆記(3)——命令者式

抽象 ring code 耦合度 引入 聲明 筆記 操作日誌 string Java設計模式之命令者式 引入命令模式 案列   比如我們要設計一個DOS命令模擬器,它可以接受輸入的命令並做出響應。   0.首先我們的DOS模擬器支持三個大類的功能:文件操作類(FileKit

OPENCV學習筆記3-4_使用模型-視圖-控制器設計應用程序

main hold sse model getters core tpi sso data   此節介紹的架構模式(MVC),將聯合使用模型-視圖-控制器(Model-View-Controller)三個模式以及其他的類。其清晰地分離程序中的邏輯部分與用戶交互部分。此節將使

設計模式學習筆記(3) --工廠方法

1.定義: 工廠方法模式(Factory Method Pattern)又稱為工廠模式,也叫虛擬構造器(Virtual Constructor)模式或者多型工廠(Polymorphic Factory)模式,它屬於類建立型模式。在工廠方法模式中,工廠父類負責定義建立產品物件的公共介面,而工廠子類

DirectX 9 UI設計學習筆記之二:第2章Introducing DirectX+第3章Introducing Direct3D

       此文由哈利_蜘蛛俠原創,轉載請註明出處!有問題歡迎聯絡本人! 上一期的地址:        別看這一期似乎要講很多內容,其實大部分是一帶而過的。其實我的重點在於弄了一個框架程式;詳情見本期最後。 第2章 Introducing DirectX ===

Hadoop學習筆記3.Hadoop RPC機制的使用

一、RPC基礎概念 1.1 RPC的基礎概念   RPC,即Remote Procdure Call,中文名:遠端過程呼叫;   (1)它允許一臺計算機程式遠端呼叫另外一臺計算機的子程式,而不用去關心底層的網路通訊細節,對我們來說是透明的。因此,它經常用於分散式網路通訊中。 RPC協議假定某些傳輸

系統分析與設計學習筆記(一)

學習 掌握 應該 溝通 基本 最終 表示 對象 毫無 為什麽要學習這門課程?   “擁有一把錘子未必能成為建築師”。 這門課程學習的是面向對象分析和設計的核心技能的重要工具。對於使用面向對象技術和語言來,創建設計良好、健壯且可維護的軟件來說,這門課程所

Redis學習筆記3-Redis5個可運行程序命令的使用

運行程序 檢查 mil 數據文件 img usr pre text mod 在redis安裝文章中,說到安裝好redis後,在/usr/local/bin下有5個關於redis的可運行程序。以下關於這5個可運行程序命令的具體說明。 redis-server Redi

Git學習筆記3——工作區與暫存區,以及版本變更

暫存區 所有 cto tag clas 內容 blank 文件 set 工作區(Working Directory) 就是你在電腦裏能看到的目錄,比如我的Git_Learning文件夾就是一個工作區。 版本庫(Repository) 工作區有一個隱藏目錄.git,這個不

Python學習筆記3:簡單文件操作

name n) popu 元素 close nes pla () eof # -*- coding: cp936 -*- # 1 打開文件 # open(fileName, mode) # 參數:fileName文件名稱 # mode打開方式 # w

Linux程序設計學習筆記——異步信號處理機制

基本概念 erro 驗證 添加 uid 函數 count ubun generate 轉載請註明出處: http://blog.csdn.net/suool/article/details/38453333 Linux常見信號與處理 基本概念 Linux的信號是一

Linux 程序設計學習筆記----Linux下文件類型和屬性管理

腳本 types.h 沒有 oot 創建 jsb 文件 屬性 文件大小 轉載請註明出處:http://blog.csdn.net/suool/article/details/38318225 部分內容整理自網絡,在此感謝各位大神。 Linux文件類型和權限 數據表示

(MYSQL學習筆記3)mysql兩行數據合並成一行

mysql使用SUM函數,加上GROUP BY人員ID就可以實現了:SELECT SUM(PZ+CPJS+BZ+GC+SB+TG+MJ+CL+CCLW+GJ+ZL+CBZZ) as count, SUM(PZ) as PZ,SUM(CPJS) as CPJS,SUM(BZ) as BZ,SUM(GC)

javascript 高級程序設計學習筆記(1)

元素 新的 logs html light begin 知識 gin nbsp 知識補充: var box = document.querySelector(‘#box‘); //"beforebegin" ,在當前元素之前插入一個緊鄰的同輩元素; box.ins

AngularJs學習筆記3-服務及過濾器

聲明 運行時 維護 style 函數調用 factor blog 使用場景 需要 距離上次別博客有有一段時間了,因為最近公司和個人事情比較多,也因為學習新的知識所以耽擱了,也有人說Angularjs1.5沒有人用了,沒必要分享,我個人感覺既然開頭了我就堅持把他寫完,

java學習筆記(3)

決定 ati 開始 詳細講解 調用 數據 寄存器 art 筆記 java基礎知識 1:方法 (1)方法:就是完成特定功能的代碼塊。 註意:在很多語言裏面有函數的定義,而在Java中,函數被稱為方法。 (2)格式: 修飾符 返回值類型 方法名(參數類型 參數名1