1. 程式人生 > >netty9---使用編碼解碼器

netty9---使用編碼解碼器

-- 定義 work etc ddr RR ech channel urn

客戶端:

package com.client;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import com.cn.codc.RequestEncoder; import com.cn.codc.ResponseDecoder; import com.cn.model.Request;
import com.cn.module.fuben.request.FightRequest; /** * netty客戶端入門 */ public class Client { public static void main(String[] args) throws InterruptedException { //服務類 ClientBootstrap bootstrap = new ClientBootstrap(); //線程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker
= Executors.newCachedThreadPool(); //socket工廠 bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker)); //管道工廠 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new ResponseDecoder());//Response解碼器 pipeline.addLast("encoder", new RequestEncoder());//Request編碼器 pipeline.addLast("hiHandler", new HiHandler());//通過ResponseDecoder解碼的是一個Response對象。 return pipeline; } }); //連接服務端 ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 10101)); Channel channel = connect.sync().getChannel(); System.out.println("client start"); Scanner scanner = new Scanner(System.in); while(true){ System.out.println("請輸入"); int fubenId = Integer.parseInt(scanner.nextLine()); int count = Integer.parseInt(scanner.nextLine()); FightRequest fightRequest = new FightRequest(); fightRequest.setFubenId(fubenId); fightRequest.setCount(count); Request request = new Request(); request.setModule((short) 1);//請求第一個模塊的 request.setCmd((short) 1); request.setData(fightRequest.getBytes());//fightRequest是數據體data //發送請求 channel.write(request); } } }
package com.client;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

import com.cn.model.Response;
import com.cn.model.StateCode;
import com.cn.module.fuben.request.FightRequest;
import com.cn.module.fuben.response.FightResponse;
/**
 * 消息接受處理類
 */
public class HiHandler extends SimpleChannelHandler {

    /**
     * 接收消息
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            Response message = (Response)e.getMessage();//解碼出來的之後是Response對象,然後進行業務處理

            if(message.getModule() == 1){
                
                if(message.getCmd() == 1){
                    FightResponse fightResponse = new FightResponse();
                    fightResponse.readFromBytes(message.getData());
                    
                    System.out.println("gold:" + fightResponse.getGold());
                    
                }else if(message.getCmd() == 2){
                    
                }
                
            }else if (message.getModule() == 1){
                
                
            }
    }

    /**
     * 捕獲異常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("exceptionCaught");
        super.exceptionCaught(ctx, e);
    }

    /**
     * 新連接
     */
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelConnected");
        super.channelConnected(ctx, e);
    }

    /**
     * 必須是鏈接已經建立,關閉通道的時候才會觸發
     */
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelDisconnected");
        super.channelDisconnected(ctx, e);
    }

    /**
     * channel關閉的時候觸發
     */
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelClosed");
        super.channelClosed(ctx, e);
    }
}

服務端:

package com.server;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

import com.cn.codc.RequestDecoder;
import com.cn.codc.ResponseEncoder;
/**
 * netty服務端入門
 */
public class Server {

    public static void main(String[] args) {

        //服務類
        ServerBootstrap bootstrap = new ServerBootstrap();
        
        //boss線程監聽端口,worker線程負責數據讀寫
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService worker = Executors.newCachedThreadPool();
        
        //設置niosocket工廠
        bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
        
        //設置管道的工廠
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            
            @Override
            public ChannelPipeline getPipeline() throws Exception {

                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", new RequestDecoder());//Request的解碼器
                pipeline.addLast("encoder", new ResponseEncoder());//Response的編碼器
                pipeline.addLast("helloHandler", new HelloHandler());
                return pipeline;
            }
        });
        bootstrap.bind(new InetSocketAddress(10101));
        System.out.println("start!!!");
    }
}
package com.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

import com.cn.model.Request;
import com.cn.model.Response;
import com.cn.model.StateCode;
import com.cn.module.fuben.request.FightRequest;
import com.cn.module.fuben.response.FightResponse;
/**
 * 消息接受處理類
 */
public class HelloHandler extends SimpleChannelHandler {

    /**
     * 接收消息
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

        Request message = (Request)e.getMessage();
        
        if(message.getModule() == 1){
            
            if(message.getCmd() == 1){
                
                FightRequest fightRequest = new FightRequest();
                fightRequest.readFromBytes(message.getData());
                
                System.out.println("fubenId:" +fightRequest.getFubenId() + "   " + "count:" + fightRequest.getCount());
                
                //回寫數據
                FightResponse fightResponse = new FightResponse();
                fightResponse.setGold(9999);
                
                Response response = new Response();
                response.setModule((short) 1);
                response.setCmd((short) 1);
                response.setStateCode(StateCode.SUCCESS);
                response.setData(fightResponse.getBytes());
                ctx.getChannel().write(response);
            }else if(message.getCmd() == 2){
            }
        }else if (message.getModule() == 1){
        }
    }

    /**
     * 捕獲異常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("exceptionCaught");
        super.exceptionCaught(ctx, e);
    }

    /**
     * 新連接
     */
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelConnected");
        super.channelConnected(ctx, e);
    }

    /**
     * 必須是鏈接已經建立,關閉通道的時候才會觸發
     */
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelDisconnected");
        super.channelDisconnected(ctx, e);
    }

    /**
     * channel關閉的時候觸發
     */
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelClosed");
        super.channelClosed(ctx, e);
    }
}

網絡傳的是字節數據不是字符。

Netty之自定義數據包協議:
give me a coffee give me a tea (2條信息)

give me a coffeegive me a tea 粘包現象(2條信息粘在一起)

give me
a coffeegive me a tea 分包現象(第一條消息分開了,第一條後部分又粘到了第二條消息)

粘包和分包出現的原因是:沒有一個穩定數據結構(沒有確定一個請求傳遞哪些數據)

分割符:
give me a coffee|give me a tea|
give me a coffee|
give me a tea|

長度 + 數據:
16give me a coffee13give me a tea(讀取16個就封裝成一個請求,投遞到業務層去)
16give me a coffee
13give me a tea

netty9---使用編碼解碼器