1. 程式人生 > >SSM(二):Netty接收資料並存入資料庫出現數據接收不完全的情況

SSM(二):Netty接收資料並存入資料庫出現數據接收不完全的情況

1.NettyServerStart

public class NettyServerStart {

    @Autowired
    public NettyServerStart(final NettyServer nettyServer) {
        System.out.println("------------Spring自動載入 ---------");
        System.out.println("------------啟動Netty服務 ---------");
        //繫結埠
        nettyServer.setServerPort(6667);
        ExecutorService executorService= Executors.newCachedThreadPool();
        //執行nettyServer
        executorService.execute(nettyServer);
    }
}

2.NettyServerInitialize

public class NettyServerInitialize extends ChannelInitializer<SocketChannel> {

    @Autowired
    private NettyServerHandler nettyServerHandler;

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(8192,12,2,0,0));
       // pipeline.addLast("decoder", new CCYLengthFieldDecoder(8192,12,2,0,0));
       // pipeline.addLast("decoder",new CNPCDecoder());
        pipeline.addLast("handler", nettyServerHandler);
        System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連線上");
    }
}

3.NettyServerHandler

重點要說一下這個channelRead0方法裡面我遇到的問題。由於某些原因還未來得及系統學習Netty,但是又急需把資料收集起來存入資料庫。情況是這樣的:有一個1w多個位元組的資料需要我來解析出真實資料,但是每次用網路測試助手往埠傳送資料,收到的資料都會出現不完全的情況。剛開始,我也不知道啥狀況,於是對於那些不完整的資料採取的是不處理。後來才知道,TCP是不會出現丟包的,當資料長度過長的時候,路由器轉發的時候會對資料進行拆分,分多少次發,channelRead0這個方法就會執行多少次,也就是說當我們覺得收到的資料不完全的時候,不是資料丟失,只是channelRead0方法還在執行中,資料沒有收發完。解決辦法:自己建立一個動態陣列,用於每次執行channelRead0方法儲存資料,然後校驗首位末尾,校驗通過之後,解析完畢,立刻存入資料庫,然後清空動態陣列。下圖可以看到,隨著channelRead0不斷執行,動態陣列越來越長。

 

public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {

    @Autowired
    private CollectedDataService collectedDataService;


    private Map<String, DynamicArray<Byte>> dynamicArrayMap = new HashMap<>();
    private int defaultDynamicCapacity = 1024 * 1024;



    /**
     * A thread-safe Set  Using ChannelGroup, you can categorize Channels into a meaningful group.
     * A closed Channel is automatically removed from the collection,
     */
    public static ChannelGroup channels =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        // Broadcast a message to multiple Channels
        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
    }


    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // (4)


        ByteBuf in = (ByteBuf) msg;
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String ip = insocket.getAddress().getHostAddress();
        if(!dynamicArrayMap.containsKey(ip)){
            dynamicArrayMap.put(ip,new DynamicArray<Byte>(defaultDynamicCapacity));
        }
        DynamicArray<Byte> dynamicArray =  dynamicArrayMap.get(ip);


        while (in.isReadable()) {
            byte recByte =  in.readByte();
            dynamicArray.addLast(recByte);
        }
        System.out.println("------------------------------------");
        System.out.println("dynamicArray長度:"+dynamicArray.getSize());


        byte [] dataByte = new byte[dynamicArray.getSize()];

//從Byte[]獲得char[]
        for (int i = 0;i<dynamicArray.getSize();i++) {
            byte b = dynamicArray.get(i);
            dataByte[i] = b;
        }

        char[]  byteToChars = getChars(dataByte);

        Integer validSuccessIndex = null;


        //todo 校驗成功
        if(byteToChars[0]=='0' && byteToChars[1] == '1'
                && byteToChars[byteToChars.length-2]== '0'
                && byteToChars[byteToChars.length-1] == '3'
        ) {
            validSuccessIndex = byteToChars.length;
            System.out.println("成功位:"+ validSuccessIndex);
            System.out.println("成功位:"+ validSuccessIndex);
            collectedDataService.addCnpcTankData(String.valueOf(byteToChars));

        }

        if(byteToChars[0]=='1' && byteToChars[1] == 'B'
                && byteToChars[byteToChars.length-2]== '0'
                && byteToChars[byteToChars.length-1] == 'D'
        ) {
            validSuccessIndex = byteToChars.length;
            System.out.println("成功位:"+ validSuccessIndex);
            System.out.println("成功位:"+ validSuccessIndex);
            collectedDataService.addCnpcGunData(String.valueOf(byteToChars));
            System.out.println("清空dynamicArray之前:"+dynamicArray.getSize());
        }

        if(validSuccessIndex != null){
            // Byte[] validBuffer = dynamicArray.leftShift(validSuccessIndex);


            dynamicArray.leftShift(validSuccessIndex);

            System.out.println("清空dynamicArray之後:"+dynamicArray.getSize());

            //清空byteToChar


        }




        System.out.println("1st:"+byteToChars[0]+" 2nd:"+byteToChars[1]+
                " 3rd:"+byteToChars[byteToChars.length-2]+" 4th:"+byteToChars[byteToChars.length-1]);


        for (char byteTochar : byteToChars
                ) {
            System.out.print(byteTochar);
        }

      //  System.out.println("--------------------");

  //      System.out.println("dataByte.lenth:"+dataByte.length);

        //ValidSuccessIndex不為空則校驗成功

    }

    byte[] toPrimitives(Byte[] oBytes)
    {
        byte[] bytes = new byte[oBytes.length];

        for(int i = 0; i < oBytes.length; i++) {
            bytes[i] = oBytes[i];
        }

        return bytes;
    }
    char[] getChars(byte[] bytes) {
        Charset cs = Charset.forName("UTF-8");
        ByteBuffer bb = ByteBuffer.allocate(bytes.length);
        bb.put(bytes);
        bb.flip();
        CharBuffer cb = cs.decode(bb);
        return cb.array();
    }







    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("Client:" + incoming.remoteAddress() + "上線");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("Client:" + incoming.remoteAddress() + "掉線");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        Channel incoming = ctx.channel();
        System.out.println("Client:" + incoming.remoteAddress() + "異常");
        // 當出現異常就關閉連線
        //cause.printStackTrace();
        ctx.close();
    }
}

4.NettyServer

package com.cnpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by caihanbin on 2017/4/29.
 */

@Component
public class NettyServer implements Runnable {

    @Autowired
    private NettyServerInitialize nettyServerInitialize;

    private int port;
    //設定埠
    public void setServerPort(int port) {
        this.port = port;
    }

    public NettyServer(){}

    public void run()  {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(nettyServerInitialize)
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
            // 繫結埠,開始接收進來的連線
            ChannelFuture f = b.bind(port).sync(); // (7)
            // 等待伺服器  socket 關閉 。
            // 在這個例子中,這不會發生,但你可以優雅地關閉你的伺服器。
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}