1. 程式人生 > >Netty之客戶端/服務端初始化(二)

Netty之客戶端/服務端初始化(二)

https://blog.csdn.net/qq_18603599/article/details/80768400

本章接著上一章,繼續講解和netty相關的知識,主要有以下幾個知識點:

1 netty服務端的初始化原始碼分析

2 netty客戶端的初始化原始碼分析

3 netty的入門例項

 

服務端初始化

  • 服務端初始化的步驟

    • 建立ServerBootstrap啟動輔助類,通過Builder模式進行引數配置;

    • 建立並繫結Reactor執行緒池EventLoopGroup;

    • 設定並繫結服務端Channel通道型別;

    • 繫結服務端通道資料處理器責任鏈Handler;

    • 繫結並啟動監聽埠;

      原始碼分析:

1) ServerBootstrap初始化

  • ServerBootstrap是netty啟動輔助類,通過Builder模式進行引數設定初始化;ServerBootstrap繼承AbstracBootstrap類,需要對EventLoopGroup,Channel和ChannelHandler等引數進行配置;

(2) EventLoop執行緒池初始化

  • EventLoopGroup初始化是建立建立兩個NioEventLoopGroup型別的Reactor執行緒池bossGroup和workGroup分別用來處理客戶端的連線請求和通道IO事件;

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b= new ServerBootstrap();
 b.group(boosGroup,workGroup)

  • 通過group()方法設定EventLoop

    • 將bossGroup傳入到AbstractBootstrap中設定到group屬性上,將workGroup設定到ServerBootstrap的childGroup屬性上;

    • 如果只傳入了一個EventLoopGroup則最後傳入兩個相同的group;

  • super.group(parentGroup)方法對AbstractBootstrap的group屬性進行設定

(3) Channel通道初始化

  • Channel初始化主要是指對通道型別進行設定,常見的通道型別主要有NioServerSocktChannel非同步非阻塞服務端TCP通道,NioSocketChannel非同步非阻塞客戶端通道,OioServerSocketChannel同步阻塞服務端通道,OioSocketChannel同步阻塞客戶端通道,NioDatagramChannel非同步非阻塞UDP通道,OioDatagramChannel同步阻塞UDP通道等;

ChannelFactory通道工程類設定

  • 在serverBootstrap初始化過程中通過呼叫channel()方法進行通道型別設定
  1. public B channel(Class<? extends C> channelClass) {

  2. if(channelClass == null) {

  3. throw new NullPointerException("channelClass");

  4. } else {

  5. return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));

  6. }

  7. }

  8. 根據傳入的Channe型別初始化一個ChannelFactory型別的工廠類,工廠類中通過newChannel()方法建立Channel例項

  1.  
  2. private final Class<? extends T> clazz;

  3.  
  4. public ReflectiveChannelFactory(Class<? extends T> clazz) {

  5. if(clazz == null) {

  6. throw new NullPointerException("clazz");

  7. } else {

  8. this.clazz = clazz;

  9. }

  10. }

  11.  
  12. public T newChannel() {

  13. try {

  14. return (Channel)this.clazz.newInstance();

  15. } catch (Throwable var2) {

  16. throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);

  17. }

  18. }

  • 通過channelFactory()方法將建立工廠類例項指向AbstractoryBootstrap的channelFactory屬性
  1. public B channelFactory(ChannelFactory<? extends C> channelFactory) {

  2. if(channelFactory == null) {

  3. throw new NullPointerException("channelFactory");

  4. } else if(this.channelFactory != null) {

  5. throw new IllegalStateException("channelFactory set already");

  6. } else {

  7. this.channelFactory = channelFactory;

  8. return this;

  9. }

  10. }

Channel通道例項化

  • 配置好AbstractBootstrap的channelFactory工廠類,Channel的例項化通過ChannelFactory.newChannel()方法實現;具體的newChannel()方法的呼叫鏈是:
  1.  
  2. ServerBootstrap.bind() -> AbstractBootstrap.doBind() -> AbstractBootstrap.initAndRegister() -> ChannelFactory.newChannel();public T newChannel() {1

  3. try {

  4. return (Channel)this.clazz.newInstance();

  5. } catch (Throwable var2) {

  6. throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);

  7. }

  8. }

  • 通過clazz.newInstance()方法呼叫構造器建立NioServerSocketChannel例項
  1. public NioServerSocketChannel() {

  2. this(newSocket(DEFAULT_SELECTOR_PROVIDER));

  3. }

  4. 呼叫newSocket()方法建立ServerSocketChannel例項,這裡的ServerSocketChannel和NIO中的ServerSocketChannel是同一個東西,接下來會呼叫父類構造器對其進行外部封裝和相關引數的配置;

  1. public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {

  2. super((Channel)null, channel, 16);

  3. this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());

  4. }

  •  
  • 在 NioServerSocketChannsl 例項化過程中, 所需要做的工作

    • 呼叫 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 開啟一個新的 Java NIO ServerSocketChannel

    • AbstractNioChannel 中的屬性:

      • SelectableChannel ch 被設定為 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.

      • readInterestOp 被設定為 SelectionKey.OP_ACCEPT

      • SelectableChannel ch 被配置為非阻塞的 ch.configureBlocking(false)

    • AbstractChannel(Channel parent) 中初始化 AbstractChannel 的屬性:

      • parent 屬性置為 null

      • unsafe 通過newUnsafe() 例項化一個 unsafe 物件, 它的型別是 AbstractNioMessageChannel#AbstractNioUnsafe 內部類

      • pipeline 是 new DefaultChannelPipeline(this) 新建立的繫結管道例項.

    • NioServerSocketChannel 中的屬性:

      • ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())

Channel通道註冊

  • 在channel通道建立和初始化完畢後,會通過group.register()方法將channel通道註冊到EventLoop執行緒池中;
  1. final ChannelFuture initAndRegister() {

  2. // 去掉非關鍵程式碼

  3. final Channel channel = channelFactory().newChannel();

  4. init(channel);

  5. ChannelFuture regFuture = group().register(channel);

  6. }

  •  
  • 通過一系列的註冊方法呼叫:AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register,最終是通過Unsafe類的register0()方法
  1. private void register0(ChannelPromise promise) {

  2. boolean firstRegistration = neverRegistered;

  3. doRegister();

  4. neverRegistered = false;

  5. registered = true;

  6. safeSetSuccess(promise);

  7. pipeline.fireChannelRegistered();

  8. // Only fire a channelActive if the channel has never been registered. This prevents firing

  9. // multiple channel actives if the channel is deregistered and re-registered.

  10. if (firstRegistration && isActive()) {

  11. pipeline.fireChannelActive();

  12. }

  13. }

  •  
  • register0()方法呼叫了doRegister()方法實現通道註冊到執行緒池中(EventLoop執行緒池會繫結一個selector選擇器)
  1.  
  2. @Override

  3. protected void doRegister() throws Exception {

  4. // 省略錯誤處理

  5. selectionKey = javaChannel().register(eventLoop().selector, 0, this);

  6. }

(4)Pipeline管道初始化

  • 每一個Channel通道在初始化時都會建立並繫結一個管道類,用來作為通道資料流的處理;pipeline管道的例項化是在AbstractChannel 的構造器中;

    • 在建立DefaultChannelPipeline例項時會傳入一個Channel物件,這個Channel物件就是之前例項化的NioServerSocketChannel例項,將pipeline管道和channel通道進行繫結;

    • DefaultChannelPipeline中維護了一個以AbstractChannelHandlerContext為節點的雙向連結串列,包含兩個欄位head和tail分別指向雙向連結串列的頭部和尾部;

  1.  
  2. public DefaultChannelPipeline(AbstractChannel channel) {

  3. if (channel == null) {

  4. throw new NullPointerException("channel");

  5. }

  6. this.channel = channel;

  7.  
  8. tail = new TailContext(this);

  9. head = new HeadContext(this);

  10.  
  11. head.next = tail;

  12. tail.prev = head;

  13. }

(5)handler處理器的新增過程

  • 我們可以自定義Handler處理器並將其加入到pipeline管道中,進而像外掛一樣自由組合各種handler完成具體的業務邏輯;新增handler的過程是獲取與channel通道繫結的管道pipeline然後將自定義的handler新增進pipeline內部維護的一個雙向連結串列;
  1. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

  2. @Override

  3. protected void initChannel(SocketChannel socketChannel) throws Exception {

  4. socketChannel.pipeline().addLast(new TimeServerHandler());

  5. }

  6. });

  • Bootstrap.childerHandler方法接收一個 ChannelHandler, 而我們傳遞的是一個 派生於ChannelInitializer的匿名類,它正好也實現了 ChannelHandler介面,因此將ChannelHandler例項賦值給ServerBootstrap的childHandler屬性;
  1. public ServerBootstrap childHandler(ChannelHandler childHandler) {

  2. if(childHandler == null) {

  3. throw new NullPointerException("childHandler");

  4. } else {

  5. this.childHandler = childHandler;

  6. return this;

  7. }

  8. }

  • 在啟動服務端繫結埠時候最終通過呼叫initAndRegister()方法建立Channel例項,並將通過init()方法將系統定義的處理器ServerBootstrapAccptor新增到與channel繫結的pipeline通道中;
  1. @Override

  2. void init(Channel channel) throws Exception {

  3. ...

  4. ChannelPipeline p = channel.pipeline();

  5.  
  6. final EventLoopGroup currentChildGroup = childGroup;

  7. final ChannelHandler currentChildHandler = childHandler;

  8. final Entry<ChannelOption<?>, Object>[] currentChildOptions;

  9. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

  10.  
  11. p.addLast(new ChannelInitializer<Channel>() {

  12. @Override

  13. public void initChannel(Channel ch) throws Exception {

  14. ChannelPipeline pipeline = ch.pipeline();

  15. ChannelHandler handler = handler();

  16. if (handler != null) {

  17. pipeline.addLast(handler);

  18. }

  19. pipeline.addLast(new ServerBootstrapAcceptor(

  20. currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

  21. }

  22. });

  23. }

  • 在ServerBootstrapAcceptor中重寫了channelRead()方法,將自定義的handler處理器新增到管道中;
  1.  
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) {

  3. final Channel child = (Channel) msg;

  4. child.pipeline().addLast(childHandler);

  5. ...

  6. childGroup.register(child).addListener(...);

  7. }

  • 伺服器端的 handler 與 childHandler 的區別與聯絡:

    • 伺服器 NioServerSocketChannel 的 pipeline 中新增的是 handler 與 ServerBootstrapAcceptor.

    • 當有新的客戶端連線請求時, ServerBootstrapAcceptor.channelRead 中負責新建此連線的NioSocketChannel並新增 childHandler 到 NioSocketChannel 對應的pipeline中, 並將此channel繫結到workerGroup中的某個eventLoop中;

    • handler是在accept階段起作用, 它處理客戶端的連線請求,ServerBootstrap也能設定handler()方法新增ServerSocketChannel的自定義處理器;

總結

  • Netty服務端的初始化主要是建立初始化輔助類ServerBootstrap,並對輔助類的相關引數進行初始化包括EventLoop執行緒池,Channle通道型別和ChannleHandler通道處理器等;

  • 在呼叫bind()方法進行埠繫結時,會根據ServerBootsrap中的初始化引數啟動服務端,具體的啟動流程為:

    • 建立ServerBootstrap啟動輔助類例項,並對其Channel,EventLoopGroup,Handler等引數進行配置;

    • 呼叫bootstrap.bind()方法時觸發啟動,會根據配置的Channle型別建立Channel例項,比如NioServerSocketChannel等

    • 在例項化Channel時候會初始化Pipeline管道並與AbstractChannel繫結

    • 將channel管道註冊到EventLoopGroup執行緒池中,從執行緒池中輪詢獲取一個執行緒EventLoop並與之繫結;

    • 啟動執行緒,執行緒執行繫結的selector的select()方法監聽註冊的channel的狀態,並執行定時任務

客戶端初始化

一、BootStrap.connect() 分析

  1. public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {

  2. 引數校驗

  3. validate();

  4. 解析與連線

  5. return doResolveAndConnect(remoteAddress, localAddress);

  6. }

  • doResolveAndConnect 方法
  1. private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

  2. 初始化和註冊

  3. final ChannelFuture regFuture = initAndRegister();

  4. }

  5. }

  • initAndRegister方法
  1. final ChannelFuture initAndRegister() {

  2. Channel channel = null;

  3. try {

  4. 1. 例項化NioSocketChannel

  5. channel = channelFactory.newChannel();

  6. 2. NioSocketChannel的 option方法設定 & atrr 方法設定

  7. init(channel);

  8.  
  9. } catch (Throwable t) {

  10. if (channel != null) {

  11. channel.unsafe().closeForcibly();

  12. }

  13. GlobalEventExecutor.INSTANCE).setFailure(t);

  14. }

  15.  
  16. 3. config().group()為 NioEventLoopGroup的例項

  17.  
  18. config().group().register()的實現為MultiThreadEventLoopGroup.register 方法

  19. ChannelFuture regFuture = config().group().register(channel);

  20.  
  21. }

  • 二、MultiThreadEventLoopGroup.register(Channel channel)
  1. @Override

  2. public EventLoop next() {

  3. return (EventLoop) super.next();

  4. }

  5.  
  6. @Override

  7. public ChannelFuture register(Channel channel) {

  8. register 方法為:SingleThreadEventLoop.register(Channel)

  9. return next().register(channel);

  10. }

  •  

三、SingleThreadEventLoop .register(Channel) 方法

  1. @Override

  2. public ChannelFuture register(Channel channel) {

  3. return register(new DefaultChannelPromise(channel, this));

  4. }

  5.  
  6. @Override

  7. public ChannelFuture register(final ChannelPromise promise) {

  8. ObjectUtil.checkNotNull(promise, "promise");

  9. promise.channel().unsafe() 為 NioSocketChannelUnsafe 例項

  10. 實際呼叫方法為 AbstractUnsafe.register

  11. promise.channel().unsafe().register(this, promise);

  12. return promise;

  13. }

四、 AbstractUnsafe. register

  1. public final void register(EventLoop eventLoop, final ChannelPromise promise) {

  2. 設定NioSocketChannel 的eventloop =SingleThreadEventLoop

  3. AbstractChannel.this.eventLoop = eventLoop;

  4. eventLoop.execute():啟動一個新的執行緒然後執行

  5. 實際呼叫方法為 SingleThreadEventLoop.execute

  6. eventLoop.execute(new Runnable() {

  7. @Override

  8. public void run() {

  9. register0(promise);//實際呼叫的是doRegister();

  10. }

  11. });

  12. }

  13.  
  • doRegister();
  1. @Override

  2. protected void doRegister() throws Exception {

  3.  
  4. for (;;) {

  5. try {

  6. 將jdk的 socketChannel 註冊到多路複用器上

  7. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

  8. return;

  9. }

  10. }

  11. }

  12. tLoop.execute(Runnable) 過程

 

這裡的eventLoop 為SingleThreadEventLoop例項

實際呼叫方法 SingleThreadEventExecutor.execute,程式碼如下:

  1. public void execute(Runnable task) {

  2. boolean inEventLoop = inEventLoop();

  3. //啟動執行緒

  4. startThread();

  5. //將任務新增到 taskQueue中

  6. addTask(task);

  7.  
  8.  
  9. }

5.1 startThread();

  1. private void startThread() {

  2.  
  3. doStartThread();

  4.  
  5. }

  6.  
  • 5.1.1 doStartThread();
  1. executor.execute(new Runnable() {

  2. @Override

  3. public void run() {

  4. //為了標記 eventLoo 是否已經啟動

  5. thread = Thread.currentThread();

  6. boolean success = false;

  7. updateLastExecutionTime();

  8. try {

  9. SingleThreadEventExecutor.this.run();

  10. success = true;

  11. }

  12. }

  13.  
  14. .... ....

  15.  
  16. });

5.1.1.1 executor.execute(runnable)

實現方法為ThreadPerTaskExecutor.execute

  1. public void execute(Runnable command) {

  2. threadFactory.newThread(command).start();

  3. }

5.1.1.2 SingleThreadEventExecutor.this.run()

此方法非常關鍵,是eventLoop的核心

實現方法為 :NioEventLoop.run()方法

  1. @Override

  2. protected void run() {

  3. for (;;) {

  4. //查詢就緒的selectedKey

  5. select(wakenUp.getAndSet(false));

  6. //執行就緒的selectedKey

  7. processSelectedKeys();

  8. //執行任務

  9. runAllTasks();

  10. }

  11. }

  • processSelectedKeys();

    processSelectedKeysOptimized 方法

  1. private void processSelectedKeysOptimized() {

  2. for (int i = 0; i < selectedKeys.size; ++i) {

  3. final SelectionKey k = selectedKeys.keys[i];

  4. selectedKeys.keys[i] = null;

  5.  
  6. final Object a = k.attachment();

  7. if (a instanceof AbstractNioChannel) {

  8. processSelectedKey(k, (AbstractNioChannel) a);

  9. } else {

  10. @SuppressWarnings("unchecked")

  11. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

  12. processSelectedKey(k, task);

  13. }

  14.  
  15. if (needsToSelectAgain) {

  16. selectedKeys.reset(i + 1);

  17. selectAgain();

  18. i = -1;

  19. }

  20. }

  21. }

processSelectedKey 方法
  •  
  1.  
  2. int readyOps = k.readyOps();

  3. //處理 connnect 事件

  4. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {

  5. int ops = k.interestOps();

  6. ops &= ~SelectionKey.OP_CONNECT;

  7. k.interestOps(ops);

  8.  
  9. unsafe.finishConnect();

  10. }

  11. //處理 write 事件

  12. if ((readyOps & SelectionKey.OP_WRITE) != 0) {

  13. ch.unsafe().forceFlush();

  14. }

  15. // 處理 read 和 accept 事件

  16. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

  17. unsafe.read();

  18. }

  19.  
  • runAllTasks();
  1. protected boolean runAllTasks() {

  2. assert inEventLoop();

  3. boolean fetchedAll;

  4. boolean ranAtLeastOne = false;

  5.  
  6. do {

  7. ScheduledTaskQueue 中的task 放入 taskQueue 中

  8. fetchedAll = fetchFromScheduledTaskQueue();

  9. // 執行taskQueue中的任務

  10. if (runAllTasksFrom(taskQueue)) {

  11. ranAtLeastOne = true;

  12. }

  13. } while (!fetchedAll);

  14.  
  15. //執行tailQueue中的任務

  16. afterRunningAllTasks();

  17. return ranAtLeastOne;

  18. }

  • 2.2 addTask(task)//新增任務到佇列中
  1. protected void addTask(Runnable task) {

  2. //taskQueue 中增加task

  3. if (!offerTask(task)) {

  4. reject(task);

  5. }

  6. }

OK 上面就大概把客戶端和服務端初始化的過程以及相關原始碼的核心都大概分析來一遍,最後再簡單編寫netty

的入門例項

netty的程式設計一般分為客戶端和服務端,其中有編解碼,業務處理的handler,例項話客戶端或者服務端例項,設定

tcp的引數,接下來開始編碼實現完成一個簡單的登入場景,就是客戶端傳送一個賬號給服務端,諮詢是否可以登入

然後服務端返回賬號資訊,以及可以登入的資訊。首先看服務端程式碼

package com.jhp.netty.chapter01.server;

import com.jhp.netty.chapter01.handler.NettyLoginServerHandler;

import com.jhp.netty.chapter01.initializer.NettyLoginServerInitialzer;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

/** * Created by jack on 2018/1/18.

* 客戶端向服務端傳送登入賬號和使用者密碼

* 服務端接受客戶端發過來的資訊,判斷username=jhp,pwd=123

* 如果正確,返回給客戶端登入成功,否則返回對應的失敗資訊給客戶度

* 基於短連結

*/

public class NettyLoginServer {

public static void main(String[] args)

{

System.out.println("服務端成功啟動....");

//例項戶boss執行緒組

// nthreads如果不指定的話,會採用預設演算法來設定=處理器核心數 * 2

EventLoopGroup mainGroup = new NioEventLoopGroup(3);

//例項戶work執行緒組

EventLoopGroup subGroup = new NioEventLoopGroup();

try{

//例項戶服務端例項

ServerBootstrap serverBootstrap = new ServerBootstrap();

//設定服務端需要的執行緒組

serverBootstrap.group(mainGroup,subGroup);

//設定服務端處理的

socketchannel serverBootstrap.channel(NioServerSocketChannel.class);

//設定tcp引數

serverBootstrap .option(ChannelOption.SO_BACKLOG,128) .option(ChannelOption.SO_KEEPALIVE,true);

//設定服務端請求處理的handler

//注意 服務端這裡一般要設定為childHandler 否則會報錯的

serverBootstrap .childHandler(new NettyLoginServerInitialzer());;

//服務端繫結埠接受客戶端的請求

ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();

//獲取非同步結果

channelFuture.channel().closeFuture().sync();

}

catch (Exception exception){ exception.printStackTrace(); }

finally {

//關閉boss執行緒

mainGroup.shutdownGracefully();

//關閉work執行緒

subGroup.shutdownGracefully(); } } }

上面把整個服務端的程式碼都實現了,其中最終要的就是childHandler,下面就看
NettyLoginServerInitialzer

一般handler都是繼承ChannelInboundHandlerAdapter 重寫裡面指定的方法,而不是重寫所有的方法,這個需要根據自己的業務需求而定的,具體程式碼如下

package com.jhp.netty.chapter01.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * Created by jack on 2018/1/18.
 */
public class NettyLoginServerHandler extends ChannelInboundHandlerAdapter {


    /**
     * 處理異常
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.writeAndFlush("sorry happend request exception");
    }

    /**
     * 客戶端連線服務端時候執行的方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連線的客戶端地址:" + ctx.channel().remoteAddress());
    }

    /**
     * 讀取客戶端傳遞過來的資料
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服務端接收到客戶端傳過來的訊息:"+msg.toString());
        ctx.writeAndFlush("恭喜你,賈紅平可以登入系統");

    }
}定義好業務處理的handler,需要再定義channel的初始化類,來完成初始化功能,看程式碼

import com.jhp.netty.chapter01.handler.NettyLoginServerHandler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

/**

* Created by jack on 2018/1/18.

* 服務端channel初始化

*/

public class NettyLoginServerInitialzer extends ChannelInitializer<SocketChannel>

{

protected void initChannel(SocketChannel ch) throws Exception

{

//設定服務端的字串解碼

ch.pipeline().addLast(new StringDecoder());

//設定服務端的字串編碼

ch.pipeline().addLast(new StringEncoder());

//設定服務端的業務處理handler

//ch.pipeline().addLast(new InnerServerHandler());

ch.pipeline().addLast(new NettyLoginServerHandler());

} }

這類的功能很簡單,主要如下

& 設定解碼例項

& 設定編碼例項

& 設定業務處理的handler

服務端相關的程式碼都寫完了,接下來再把客戶端程式碼實現以下,

package com.jhp.netty.chapter01.client;

import com.jhp.netty.chapter01.handler.NettyLoginClientHandler;

import com.jhp.netty.chapter01.initializer.NettyLoginClientInitialzer;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import io.netty.handler.codec.Delimiters;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

/**

* Created by jack on 2018/1/18.

* 客戶端的主要工作是 1,連線到服務端 2,向服務端傳送資料資料 3,處理服務端返回的資料 4,關閉連線

*/

public class NettyLoginClient

{

public static void main(String[] args)

{ connect(); }

public static void connect(){

System.out.println("客戶端成功啟動...");

//建立nioeventlopp執行緒組

EventLoopGroup clientGroup = new NioEventLoopGroup();

try{

//例項話客戶端例項 這裡是要和服務端區分開來的,服務端前面還有一個

Server Bootstrap bootstrap = new Bootstrap();

//設定客戶端執行緒組

bootstrap.group(clientGroup);

//設定客戶端的處理

sockertchannel bootstrap.channel(NioSocketChannel.class);

//設定tcp相關引數

bootstrap.option(ChannelOption.SO_KEEPALIVE,true);

//設定客戶端處理的handler 這裡也是要和服務端區分開來,客戶端一般不會使用

childHandler bootstrap.handler(new NettyLoginClientInitialzer());

//連線到服務端且獲取非同步結果 這裡的埠號要和服務端的一致

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8080).sync();

//關閉channel並且拿到結果,如果關閉的話是獲取不到結果

//這個一旦關閉了,整個請求就關閉了,所以長連線的時候是不能這樣操作的

channelFuture.channel().closeFuture().sync();

} catch (Exception ex)

{

ex.printStackTrace();

}

finally {

//如果在服務端需要返回資料給客戶端 這個時候這個不能關閉

clientGroup.shutdownGracefully();

} } }

再看客戶端對應的業務handler:

package com.jhp.netty.chapter01.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by jack on 2018/1/18.
 *
 */
public class NettyLoginClientHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //不能使用write 否則沒有效果
        ctx.writeAndFlush("賈紅平,可以登入系統嗎");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("客戶端接收到服務端響應的資料:"+msg.toString());
        ctx.channel().close();
    }
    /**
     * 處理異常
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.writeAndFlush("sorry happend request exception");
    }  

}
最後再看一下客戶端的channel初始化,其實和服務端幾乎是一致的,就是設定解碼,編碼,業務處理的handler和channelpipeline繫結

* Created by jack on 2018/1/18. */

public class NettyLoginClientInitialzer extends ChannelInitializer<SocketChannel>

{

protected void initChannel(SocketChannel ch) throws Exception

{

//設定字串的編碼

ch.pipeline().addLast(new StringDecoder());

//設定字串的解碼

ch.pipeline().addLast(new StringEncoder());

//設定對應的業務處理handler

//注意 handler 一般放在最後設定

//ch.pipeline().addLast(new InnerClientHandler());

ch.pipeline().addLast(new NettyLoginClientHandler()); } }

接下來看一下整體測試的效果,一般要先啟動服務端,否則會報錯,等服務端成功啟動之後,再啟動客戶端,具體效果如下

服務端的效果

客戶端的效果

至此netty的入門例項就講解完了,雖然效果很簡單,但是基本是把netty程式設計的通用步驟和元件都講解到,以後就算是遇到複雜的業務也基本上是按照這個來的,無非就是一些擴充套件,比如上面的編碼 解碼都是比較簡單的,在複雜的業務中往往需要開發者自己實現編解碼功能,當然netty本身就有很多內建,也有可擴充套件的類,這個在後面的章節會詳細介紹到。。