1. 程式人生 > >三、Netty伺服器與多客戶連線利用廣播方式完全理解netty的讀寫機制

三、Netty伺服器與多客戶連線利用廣播方式完全理解netty的讀寫機制

         本篇採用的編解碼是netty自帶的字串的格式的編解碼,使用者1可以控制檯發訊息(輸入完訊息要加換行符號,是一個訊息結束的分隔符,後面我會在編解碼中詳細講解)給伺服器,伺服器廣播給其他所有的客戶端,這個機制,我們遊戲開發伺服器中應用很普遍,遊戲中的廣播,遊戲中你看到其他玩家在操作。。。都是利用這個廣播機制。

案例如下:

package com.zhurong.netty.test3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Description:
 * User: zhurong
 * Date: 2018-09-24  23:17
 */
public class NettyChatServer {

    public static void main(String[] args) {
        //接收連線
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //連線傳送給work
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            System.out.println("伺服器啟動成功!");
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(new NettyChatServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}
package com.zhurong.netty.test3;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * Description: 客戶端與伺服器端連線一旦建立,這個類中方法就會被回撥
 * User: zhurong
 * Date: 2018-09-24  21:29
 */
public class NettyChatServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //解碼器
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new NettyChatServerHandler());
    }
}
package com.zhurong.netty.test3;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 此處定義客戶端和伺服器端傳遞的是字串所以用了String
 * Description:
 * User: zhurong
 * Date: 2018-09-24  23:30
 */
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+"------>"+msg);
          //返回資料給客戶端,是一個非同步的操作
        channelGroup.forEach(channel ->{
            System.out.println("channel:"+channel);
            if(ctx.channel() != channel){
                channel.writeAndFlush(ctx.channel().remoteAddress()+"傳送的訊息" + msg+"\n");
                System.out.println("傳送訊息給客戶端:"+ctx.channel().remoteAddress()+",msg:"+msg+"\n");
            }else {
                channel.writeAndFlush("是自己的訊息\n");
                System.out.println("是自己的訊息:"+ msg);
            }
        });
//        channelGroup.writeAndFlush("伺服器已經收到你們客戶端訊息了!");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server 推送的訊息:"+ ctx.channel().remoteAddress() + "註冊進了伺服器\n");
        channelGroup.writeAndFlush("server 推送的訊息:"+ ctx.channel().remoteAddress() + "註冊進了伺服器\n");
        channelGroup.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        channelGroup.writeAndFlush("server 推送的訊息:"+ ctx.channel().remoteAddress() + "離開了伺服器\n");
        System.out.println("server 推送的訊息:"+ ctx.channel().remoteAddress() + "離開了伺服器\n");
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channelGroup.writeAndFlush(ctx.channel().remoteAddress() + "上線了\n");
        System.out.println(ctx.channel().remoteAddress() + "上線了\n");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        channelGroup.writeAndFlush(ctx.channel().remoteAddress() + "下線了\n");
        System.out.println(ctx.channel().remoteAddress() + "下線了\n");
    }
}
package com.zhurong.netty.test3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * Description:
 * User: zhurong
 * Date: 2018-09-24  23:37
 */
public class NettyChatClient {

    public static void main(String[] args) throws InterruptedException, IOException {
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            System.out.println("客戶端啟動成功");
            Bootstrap bootstrap =  new Bootstrap();
            bootstrap.group(eventExecutors).channel(NioSocketChannel.class).handler(new NettyChatClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost",8000).sync();
            Channel channel = channelFuture.channel();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            for(;;){
                channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
            }
//            channelFuture.channel().closeFuture().sync();

        }finally {
            eventExecutors.shutdownGracefully();
        }

    }
}
package com.zhurong.netty.test3;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 *  Netstat –ano|findstr
 * Description: 客戶端與伺服器端連線一旦建立,這個類中方法就會被回撥
 * User: zhurong
 * Date: 2018-09-24  21:29
 */
public class NettyChatClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //解碼器
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new NettyChatClientHandler());
    }
}
package com.zhurong.netty.test3;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Description:
 * User: zhurong
 * Date: 2018-09-24  22:01
 */
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("收到伺服器的訊息:"+ctx.channel().remoteAddress() + msg);
//        System.out.println("msg:"+ msg);
//        ctx.writeAndFlush("from client "+ System.currentTimeMillis());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush("acitve客戶端第一條資訊");
    }
}

 

現在是不是對netty又進步一熟悉了!