1. 程式人生 > >深入淺出Netty(1)

深入淺出Netty(1)

前言

Netty是一個高效能、非同步事件驅動的NIO框架,它提供了對TCP、UDP和檔案傳輸的支援,作為一個非同步NIO框架,Netty的所有IO操作都是非同步非阻塞的,通過Future-Listener機制,使用者可以方便的主動獲取或者通過通知機制獲得IO操作結果。

作為當前最流行的NIO框架,Netty在網際網路領域、大資料分散式計算領域、遊戲行業、通訊行業等獲得了廣泛的應用,一些業界著名的開源元件也基於Netty的NIO框架構建。

所以,Netty這個ZB技能必須要學會,不熟悉NIO的可以先看看深入淺出NIO Socket

Reactor模型

Netty中的Reactor模型主要由多路複用器(Acceptor)、事件分發器(Dispatcher)、事件處理器(Handler)組成,可以分為三種。

1、單執行緒模型:所有I/O操作都由一個執行緒完成,即多路複用、事件分發和處理都是在一個Reactor執行緒上完成的。

Reactor單執行緒模型

對於一些小容量應用場景,可以使用單執行緒模型。但是對於高負載、大併發的應用卻不合適,主要原因如下:

  • 一個執行緒同時處理成百上千的鏈路,效能上無法支撐,即便CPU負荷達到100%,也無法滿足海量訊息的編碼、解碼、讀取和傳送;
  • 當負載過重後,處理速度將變慢,這會導致大量客戶端連線超時,超時之後往往會進行重發,最終會導致大量訊息積壓和處理超時,成為系統的效能瓶頸;
  • 一旦單執行緒意外跑飛,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障,可靠性不高。

2、多執行緒模型:為了解決單執行緒模型存在的一些問題,演化而來的Reactor執行緒模型。

Reactor多執行緒模型

多執行緒模型的特點:

  • 有專門一個Acceptor執行緒用於監聽服務端,接收客戶端的TCP連線請求;
  • 網路IO的讀寫操作由一個NIO執行緒池負責,執行緒池可以採用標準的JDK執行緒池實現,包含一個任務佇列和N個可用的執行緒,由這些NIO執行緒負責訊息的讀取、解碼、編碼和傳送;
  • 一個NIO執行緒可以同時處理多條鏈路,但是一個鏈路只能對應一個NIO執行緒,防止發生併發操作問題。

在絕大多數場景下,Reactor多執行緒模型都可以滿足效能需求;但是,在極特殊應用場景中,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端併發連線,或者服務端需要對客戶端的握手訊息進行安全認證,認證本身非常損耗效能。在這類場景下,單獨一個Acceptor執行緒可能會存在效能不足問題,為了解決效能問題,產生了第三種Reactor執行緒模型-主從Reactor多執行緒模型。

3、主從多執行緒模型:採用多個reactor,每個reactor都在自己單獨的執行緒裡執行。如果是多核,則可以同時響應多個客戶端的請求,一旦鏈路建立成功就將鏈路註冊到負責I/O讀寫的SubReactor執行緒池上。

主從多執行緒模型

事實上,Netty的執行緒模型並非固定不變,在啟動輔助類中建立不同的EventLoopGroup例項並通過適當的引數配置,就可以支援上述三種Reactor執行緒模型。正是因為Netty對Reactor執行緒模型的支援提供了靈活的定製能力,所以可以滿足不同業務場景的效能需求。

示例程式碼

以下是server和client的示例程式碼,其中使用的是 Netty 4.x,先看看如何實現,後續會針對各個模組進行深入分析。

server 程式碼實現

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 publicclassEchoServer{privatefinalintport;publicEchoServer(intport){this.port=port;}publicvoidrun()throwsException{// Configure the server.EventLoopGroup bossGroup=newNioEventLoopGroup();// (1)EventLoopGroup workerGroup=newNioEventLoopGroup();try{ServerBootstrapb=newServerBootstrap();// (2)b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)// (3).option(ChannelOption.SO_BACKLOG,100).handler(newLoggingHandler(LogLevel.INFO)).childHandler(newChannelInitializer<SocketChannel>(){// (4)@OverridepublicvoidinitChannel(SocketChannel ch)throwsException{ch.pipeline().addLast(//new LoggingHandler(LogLevel.INFO),newEchoServerHandler());}});// Start the server.ChannelFuturef=b.bind(port).sync();// (5)// Wait until the server socket is closed.f.channel().closeFuture().sync();}finally{// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}publicstaticvoidmain(String[]args)throwsException{intport;if(args.length>0){port=Integer.parseInt(args[0]);}else{port=8080;}newEchoServer(port).run();}}

EchoServerHandler 實現

12345678910111213141516171819202122 publicclassEchoServerHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalLogger logger=Logger.getLogger(EchoServerHandler.class.getName());@Override  publicvoidchannelRead(ChannelHandlerContext ctx,Objectmsg)throwsException{ctx.write(msg);}@Override  publicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{ctx.flush();}@Override  publicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){// Close the connection when an exception is raised.  logger.log(Level.WARNING,"Unexpected exception from downstream.",cause);ctx.close();}}

1、NioEventLoopGroup 是用來處理I/O操作的執行緒池,Netty對 EventLoopGroup 介面針對不同的傳輸協議提供了不同的實現。在本例子中,需要例項化兩個NioEventLoopGroup,通常第一個稱為“boss”,用來accept客戶端連線,另一個稱為“worker”,處理客戶端資料的讀寫操作。
2、ServerBootstrap 是啟動服務的輔助類,有關socket的引數可以通過ServerBootstrap進行設定。
3、這裡指定NioServerSocketChannel類初始化channel用來接受客戶端請求。
4、通常會為新SocketChannel通過新增一些handler,來設定ChannelPipeline。ChannelInitializer 是一個特殊的handler,其中initChannel方法可以為SocketChannel 的pipeline新增指定handler。
5、通過繫結埠8080,就可以對外提供服務了。

client 程式碼實現

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 publicclassEchoClient{privatefinalStringhost;privatefinalintport;privatefinalintfirstMessageSize;publicEchoClient(Stringhost,intport,intfirstMessageSize){this.host=host;this.port=port;this.firstMessageSize=firstMessageSize;}publicvoidrun()throwsException{// Configure the client.  EventLoopGroup group=newNioEventLoopGroup();try{Bootstrapb=newBootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(newChannelInitializer<SocketChannel>(){@Override  publicvoidinitChannel(SocketChannel ch)throwsException{ch.pipeline().addLast(//new LoggingHandler(LogLevel.INFO),  newEchoClientHandler(firstMessageSize));}});// Start the client.  ChannelFuturef=b.connect(host,port).sync();// Wait until the connection is closed.  f.channel().closeFuture().sync();}finally{// Shut down the event loop to terminate all threads.  group.shutdownGracefully();}}publicstaticvoidmain(String[]args)throwsException{finalStringhost=args[0];finalintport=Integer.parseInt(args[1]);finalintfirstMessageSize;if(args.length==3){firstMessageSize=Integer.parseInt(args[2]);}else{firstMessageSize=256;}newEchoClient(host,port,firstMessageSize).run();}}

EchoClientHandler 實現

123456789101112131415161718192021222324252627282930313233343536373839404142 publicclassEchoClientHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalLogger logger=Logger.getLogger(EchoClientHandler.class.getName());privatefinalByteBuf firstMessage;/**      * Creates a client-side handler.      */publicEchoClientHandler(intfirstMessageSize){if(firstMessageSize<=0){thrownewIllegalArgumentException("firstMessageSize: "+firstMessageSize);}firstMessage=Unpooled.buffer(firstMessageSize);for(inti=0;i<firstMessage.capacity();i++){firstMessage.writeByte((byte)i);}}@Override  publicvoidchannelActive(ChannelHandlerContext ctx){ctx.writeAndFlush(firstMessage<