1. 程式人生 > >《Java 編寫基於 Netty 的 RPC 框架》

《Java 編寫基於 Netty 的 RPC 框架》

一 簡單概念

RPC: ( Remote Procedure Call),遠端呼叫過程,是通過網路呼叫遠端計算機的程序中某個方法,從而獲取到想要的資料,過程如同呼叫本地的方法一樣.

阻塞IO :當阻塞I/O在呼叫InputStream.read()方法是阻塞的,一直等到資料到來時才返回,同樣ServerSocket.accept()方法時,也是阻塞,直到有客戶端連線才返回,I/O通訊模式如下:

 

缺點:當客戶端多時,會建立大量的處理執行緒,並且為每一個執行緒分配一定的資源;阻塞可能帶來頻繁切換上下文,這時引入NIO

NIO : jdk1.4引入的(NEW Input/Output),是基於通過和快取區的I/O方式,(插入一段題外話,學的多忘得也多,之前有認真研究過NIO,後來用到的時候,忘得一乾二淨,所以學習一些東西,經常返回看看),NIO是一種非阻塞的IO模型,通過不斷輪詢IO事件是否就緒,非阻塞是指執行緒在等待IO的時候,可以做其他的任務,同步的核心是Selector,Selector代替執行緒本省的輪詢IO事件,避免了阻塞同時減少了不必要的執行緒消耗;非阻塞的核心是通道和快取區,當IO事件的就緒時,可以將快取區的資料寫入通道

 

其工作原理:

1 由專門的執行緒來處理所有的IO事件,並且負責轉發

2 事件驅動機制:事件到的時候才觸發,而不是同步監視

3 執行緒通訊:執行緒之間通訊通過wait,notify等方式通訊,保證每次上下文切換都是有意義的,減少沒必要的執行緒切換

通道 : 是對原I/O包中流的模擬,所有資料必須通過Channel物件,常見的通道FileChannel,SocketChannel,ServerSocketChannel,DatagramChannel(UDP協議向網路連線的兩端讀寫資料)

 

Buffer快取區 :實際上是一個容器,一個連續的陣列,任何讀寫的資料都經過Buffer

 

Netty :是由JBOSS提供的一個java開源框架,是一個高效能,非同步事件驅動的NIO框架,基於JAVA NIO提供的API實現,他提供了TCP UDP和檔案傳輸的支援,,所有操作都是非同步非阻塞的.通過Futrue-Listener機制,本質就是Reactor模式的現實,Selector作為多路複用器,EventLoop作為轉發器,而且,netty對NIO中buffer做優化,大大提高了效能

二 Netty 客戶端和服務端的

Netty中Bootstrap和Channel的生命週期

Bootstrap簡介

Bootstarp:載入程式,將ChannelPipeline,ChannelHandler,EventLoop進行整體關聯

 

Bootstrap具體分為兩個實現

ServerBootstrap:用於服務端,使用一個ServerChannel接收客戶端的連線,並建立對應的子Channel

Bootstrap:用於客戶端,只需要一個單獨的Channel,配置整個Netty程式,串聯起各個元件

二者的主要區別:

1 ServerBootstrap用於Server端,通過呼叫bind()繫結一個埠監聽連線,Bootstrap用於Client端,需要呼叫connect()方法來連線伺服器端,我們也可以呼叫bind()方法接收返回ChannelFuture中Channel

2 客戶端的Bootstrap一般用一個EventLoopGroup,而伺服器的ServerBootstrap會用兩個第一個EventLoopGroup專門負責繫結到埠監聽連線事件,而第二個EventLoopGroup專門用來處處理每個接收的連線,這樣大大提高了併發量

 
public class Server { public static void main(String[] args) throws Exception { // 1 建立線兩個事件迴圈組 // 一個是用於處理伺服器端接收客戶端連線的 // 一個是進行網路通訊的(網路讀寫的) EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); // 2 建立輔助工具類ServerBootstrap,用於伺服器通道的一系列配置 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) // 繫結倆個執行緒組 .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel對應TCP, NioDatagramChannel對應UDP .option(ChannelOption.SO_BACKLOG, 1024) // 設定TCP緩衝區 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 設定傳送緩衝大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 這是接收緩衝大小 .option(ChannelOption.SO_KEEPALIVE, true) // 保持連線 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel建立連線後的管道 // 3 在這裡配置 通訊資料的處理邏輯, 可以addLast多個... sc.pipeline().addLast(new ServerHandler()); } }); // 4 繫結埠, bind返回future(非同步), 加上sync阻塞在獲取連線處 ChannelFuture cf1 = b.bind(8765).sync(); //ChannelFuture cf2 = b.bind(8764).sync(); //可以繫結多個埠 // 5 等待關閉, 加上sync阻塞在關閉請求處 cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Server :" + body ); String response = "返回給客戶端的響應:" + body ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); // future完成後觸發監聽器, 此處是寫完即關閉(短連線). 因此需要關閉連線時, 要通過server端關閉. 直接關閉用方法ctx[.channel()].close() //.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("讀完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } } public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //可以使用多個埠 //傳送訊息, Buffer型別. write需要flush才傳送, 可用writeFlush代替 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes())); //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes())); cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } } public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Client :" + body ); } finally { // 記得釋放xxxHandler裡面的方法的msg引數: 寫(write)資料, msg引用將被自動釋放不用手動處理; 但只讀資料時,!必須手動釋放引用數 ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } 

其他元件:

Handle: 為了支援各種協議和處理資料的方式,可以是連線,資料接收,異常,資料格式轉換等

ChannelHandler

ChannelInboundHandler :最常用的Handler,作用是處理接收資料的事件,來處理我們的核心業務邏輯。

ChannelInitializer :,當一個連結建立時,我們需要知道怎麼來接收或者傳送資料,當然,我們有各種各樣的Handler實現來處理它,那麼ChannelInitializer便是用來配置這些Handler,它會提供一個ChannelPipeline,並把Handler加入到ChannelPipeline。

ChannelPipeline :一個Netty應用基於ChannelPipeline機制,這種機制依賴EventLoop和EventLoopGroup,這三個都和事件或者事件處理相關

EventLoop : 為Channel處理IO操作,一個EventLoop可以為多個Channel服務

EventLoopGroup :包含多個EventLoop

Channel :代表一個Socket連線

Future :在Netty中所有的IO操作都是非同步的,,因此我們不知道,過來的請求是否被處理了,所以我們註冊一個監聽,當操作執行成功或者失敗時監聽自動觸發,所有操作都會返回一個ChannelFutrue

ChannelFuture

Netty 是一個非阻塞的,事件驅動的,網路程式設計框架,我們通過一張圖理解一下,Channel,EventLoop以及EventLoopGroup之間的關係

 

解釋一下,當一個連線過來,Netty首先會註冊一個channel,然後EventLoopGroup會分配一個EventLoop繫結到這個channel,在這個channel的整個生命週期過程中,這個EventLoop一直為他服務,這個玩意就是一個執行緒

 

這下聊一下Netty如何處理資料?

前面有講到,handler資料處理核心,,而ChannelPipeline負責安排Handler的順序和執行,我們可以這樣理解,資料在ChannelPipeline中流動,其中ChannelHandler就是一個個閥門,這些資料都會經過每一個ChannelHandler並且被他處理,其中ChannelHandler的兩個子類ChannelOutboundHandler和ChannelInboundHandler,根據不同的流向,選擇不同的Handler

 

由圖可以看出,一個數據流進入ChannelPipeline時,一個一個handler挨著執行,各個handler的資料傳遞,這需要呼叫方法中ChannelHandlerContext來操作,而這個ChannelHandlerContext可以用來讀寫Netty中的資料流

三 Netty中的業務處理

netty中會有很多Handler.具體哪一種Handler還要看繼承是InboundAdapter還是OutboundAdapter,Netty中提供一系列的Adapter來幫助我們簡化開發,在ChannelPipeline中的每一個handler都負責把Event傳遞個洗下一個handler,有這些adapter,這些工作可以自動完成,,我們只需覆蓋我們真正實現的部分即可,接下來比較常用的三種ChannelHandler

Encoders和Decodeers

我們在網路傳輸只能傳輸位元組流,在傳送資料時,把我們的message轉換成bytes這個過程叫Encode(編碼),相反,接收資料,需要把byte轉換成message,這個過程叫Decode(解碼)

Domain Logic

我們真正關心的如何處理解碼以後的資料,我們真正的業務邏輯便是接收處理的資料,Netty提供一個常用的基類就是SimpleChannelInboundHandler<T>,其中T就是Handler處理的資料型別,訊息到達這個Handler,會自動呼叫這個Handler中的channelRead0(ChannelHandlerContext,T)方法,T就是傳過來的資料物件

四 基於netty實現的Rpc的例子

這是我的github上專案的位置

https://github.com/developerxiaofeng/rpcByNetty

專案目錄結構如下

 

詳細的專案細節看類中的註釋,很詳細哦。

獲取資料:

最後給大家分享一些學習資料,裡面包括:(BATJ面試資料、高可用、高併發、高效能及分散式、Jvm效能調優、Spring原始碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)和Java進階學習路線圖

領取方式:加微訊號 weixin99ting  備註 :(資料) 即可獲取。

最後,祝大家