1. 程式人生 > >Netty實現客戶端和服務端通信簡單例子

Netty實現客戶端和服務端通信簡單例子

啟動服務 ali tty 過程 等等 服務器初始化 讀寫操作 extends ask

Netty是建立在NIO基礎之上,Netty在NIO之上又提供了更高層次的抽象。

在Netty裏面,Accept連接可以使用單獨的線程池去處理,讀寫操作又是另外的線程池來處理。

Accept連接和讀寫操作也可以使用同一個線程池來進行處理。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫線程一塊處理。線程池中的每一個線程都是NIO線程。用戶可以根據實際情況進行組裝,構造出滿足系統需求的並發模型。

Netty提供了內置的常用編解碼器,包括行編解碼器[一行一個請求],前綴長度編解碼器[前N個字節定義請求的字節長度],可重放解碼器[記錄半包消息的狀態],HTTP編解碼器,WebSocket消息編解碼器等等

Netty提供了一些列生命周期回調接口,當一個完整的請求到達時,當一個連接關閉時,當一個連接建立時,用戶都會收到回調事件,然後進行邏輯處理。

Netty可以同時管理多個端口,可以使用NIO客戶端模型,這些對於RPC服務是很有必要的。

Netty除了可以處理TCP Socket之外,還可以處理UDP Socket。

在消息讀寫過程中,需要大量使用ByteBuffer,Netty對ByteBuffer在性能和使用的便捷性上都進行了優化和抽象。

代碼:

服務端:

package com.kinson.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * descripiton:服務端 * * @author: www.iknowba.cn * @date: 2018/3/23 * @time: 15:37 * @modifier: *
@since: */ public class NettyServer { /** * 端口 */ private int port; public NettyServer(int port) { this.port = port; } public void run() { //EventLoopGroup是用來處理IO操作的多線程事件循環器 //負責接收客戶端連接線程 EventLoopGroup bossGroup = new NioEventLoopGroup(); //負責處理客戶端i/o事件、task任務、監聽任務組 EventLoopGroup workerGroup = new NioEventLoopGroup(); //啟動 NIO 服務的輔助啟動類 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); //配置 Channel bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ServerIniterHandler()); //BACKLOG用於構造服務端套接字ServerSocket對象, // 標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); //是否啟用心跳保活機制 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); try { //綁定服務端口監聽 Channel channel = bootstrap.bind(port).sync().channel(); System.out.println("server run in port " + port); //服務器關閉監聽 /*channel.closeFuture().sync()實際是如何工作: channel.closeFuture()不做任何操作,只是簡單的返回channel對象中的closeFuture對象,對於每個Channel對象,都會有唯一的一個CloseFuture,用來表示關閉的Future, 所有執行channel.closeFuture().sync()就是執行的CloseFuturn的sync方法,從上面的解釋可以知道,這步是會將當前線程阻塞在CloseFuture上*/ channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關閉事件流組 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { new NettyServer(8899).run(); } }

服務端業務邏輯處理:

package com.kinson.netty.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * descripiton: 服務器的處理邏輯
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 15:50
 * @modifier:
 * @since:
 */
public class ServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 所有的活動用戶
     */
    public static final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 讀取消息通道
     *
     * @param context
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext context, String s)
            throws Exception {
        Channel channel = context.channel();
        //當有用戶發送消息的時候,對其他的用戶發送消息
        for (Channel ch : group) {
            if (ch == channel) {
                ch.writeAndFlush("[you]: " + s + "\n");
            } else {
                ch.writeAndFlush("[" + channel.remoteAddress() + "]: " + s + "\n");
            }
        }
        System.out.println("[" + channel.remoteAddress() + "]: " + s + "\n");
    }

    /**
     * 處理新加的消息通道
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        for (Channel ch : group) {
            if (ch == channel) {
                ch.writeAndFlush("[" + channel.remoteAddress() + "] coming");
            }
        }
        group.add(channel);
    }

    /**
     * 處理退出消息通道
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        for (Channel ch : group) {
            if (ch == channel) {
                ch.writeAndFlush("[" + channel.remoteAddress() + "] leaving");
            }
        }
        group.remove(channel);
    }

    /**
     * 在建立連接時發送消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        boolean active = channel.isActive();
        if (active) {
            System.out.println("[" + channel.remoteAddress() + "] is online");
        } else {
            System.out.println("[" + channel.remoteAddress() + "] is offline");
        }
        ctx.writeAndFlush("[server]: welcome");
    }

    /**
     * 退出時發送消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        if (!channel.isActive()) {
            System.out.println("[" + channel.remoteAddress() + "] is offline");
        } else {
            System.out.println("[" + channel.remoteAddress() + "] is online");
        }
    }

    /**
     * 異常捕獲
     *
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
        Channel channel = ctx.channel();
        System.out.println("[" + channel.remoteAddress() + "] leave the room");
        ctx.close().sync();
    }

}

服務端處理器註冊:

package com.kinson.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


/**
 * descripiton: 服務器初始化
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 15:46
 * @modifier:
 * @since:
 */
public class ServerIniterHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道註冊handler
        ChannelPipeline pipeline = socketChannel.pipeline();
        //編碼通道處理
        pipeline.addLast("decode", new StringDecoder());
        //轉碼通道處理
        pipeline.addLast("encode", new StringEncoder());
        //聊天服務通道處理
        pipeline.addLast("chat", new ServerHandler());
    }
}

客戶端:

package com.kinson.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * descripiton: 客戶端
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 16:40
 * @modifier:
 * @since:
 */
public class NettyClient {

    private String ip;

    private int port;

    private boolean stop = false;

    public NettyClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void run() throws IOException {
        //設置一個多線程循環器
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //啟動附註類
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup);
        //指定所使用的NIO傳輸channel
        bootstrap.channel(NioSocketChannel.class);
        //指定客戶端初始化處理
        bootstrap.handler(new ClientIniterHandler());
        try {
            //連接服務
            Channel channel = bootstrap.connect(ip, port).sync().channel();
            while (true) {
                //向服務端發送內容
                BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                String content = reader.readLine();
                if (StringUtils.isNotEmpty(content)) {
                    if (StringUtils.equalsIgnoreCase(content, "q")) {
                        System.exit(1);
                    }
                    channel.writeAndFlush(content);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyClient("127.0.0.1", 8899).run();
    }
}

客戶端邏輯處理:

package com.kinson.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * descripiton: 客戶端邏輯處理
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 16:50
 * @modifier:
 * @since:
 */
public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        //打印服務端的發送數據
        System.out.println(s);
    }
}

客戶端處理器註冊:

package com.kinson.netty.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * descripiton: 客戶端處理初始化
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 16:55
 * @modifier:
 * @since:
 */
public class ClientIniterHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //註冊管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("http", new HttpClientCodec());
        pipeline.addLast("chat", new ClientHandler());
    }
}

測試時先啟動服務端,再啟動客戶端。。。

Netty實現客戶端和服務端通信簡單例子