1. 程式人生 > >網際網路技術22——netty編解碼技術與資料通訊

網際網路技術22——netty編解碼技術與資料通訊

Netty服務部署

常用的部署方式有2中,一種是耦合在Web應用中(以Tomcat為例),使其伴隨Tomcat的啟動而啟動,伴隨Tomcat的關閉而關閉。另外一種則是將Netty獨立打包部署,然後由單獨的程序啟動執行(可以使用shell或其他指令碼進行啟動),然後以資料庫或者其他快取為承接點,實現資料互動。Netty與其他程式進行互動,然後將獲取到的資料進行處理插入資料庫或者快取,然後其他服務從中獲取。獲取在Netty中呼叫web應用的一些對外介面。

Netty編解碼技術

編解碼技術,說白了就是java序列化技術,序列化目的就兩個,第一個進行網路傳輸,第二物件持久化。雖然我們可以使用java進行物件序列化,netty去傳輸,但是java序列化的硬傷太多,比如:java序列化沒法跨語言、序列化後碼流太大、序列化效能太低等等。。

主流的編解碼框架:

  • JBoss的Marshalling包
  • google的Protobuf
  • 基於Protobuf的Kyro (效能高於Protobuf,可以與Marshalling媲美)
  • MessagePack框架

Netty結合JBoss Marshalling

JBoss Marshalling是一個java物件序列化包,對JDK預設的序列化框架進行了優化,但又保持跟java.io.Seriallzable介面相容,同時增加了一些可調的引數和附加特性。

類庫:jboss-marshalling1.3.0、jboss-marshalling-serial-1.3.0

這兩個包一定要都匯入,尤其是jboss-marshalling-serial-1.3.0,缺此包可能不會報錯,但是server不能解析傳輸的資料物件,跳過channelRead()而直接執行channelComple()

下載地址:https://www.jboss.org/jbossmarshalling/downloads/

maven路徑:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
  </dependency>
  <dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling</artifactId>
    <version>1.3.0.GA</version>
  </dependency>
  <dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling-serial</artifactId>
    <version>1.3.0.GA</version>
  </dependency>

下面的程式碼我直接結合著長連線超時重連一起演示,即資料通訊

資料通訊

一般在專案中我們更改如何使用netty呢?大體上對於一些茶樹配置都是根據伺服器效能決定的,這個不是最主要的,我們需要考慮的問題是兩天機器(甚至是多臺)使用netty怎樣進行通訊,一般分為三種:

1. 第一種,使用長連線通道不斷開的形式進行通訊,也就是伺服器和客戶端一直處於開啟狀態,如果伺服器效能足夠好,並且我們客戶端的數量也比較少的情況下,還是可以採用這種方式的。

2.第二種,一次性批量提交資料,採用短連線方式。也就是我們會把資料儲存咋及本地臨時緩衝區或者臨時表裡,當達到臨界值時進行一次性批量提交,又或者根據定時任務輪詢提交,這種情況弊端是做不到實時性傳輸,在對實施性不高的應用程式彙總可以推薦使用。

3.第三種,使用一種特殊的長連線,在指定某一時間內,伺服器與某客戶端沒有任何通訊則斷開連線,下次連線則是客戶端發起請求的時候再次建立連線,但是這種模式我們需要考慮兩個因素:
  (1)如何在超時(即伺服器和客戶端沒有任何通訊)後關閉通道,關閉通道後我們又如何再次建立連線?答:可以使用連線的時候新增判斷,如果連線狀態可用直接使用,如果斷開則重新建立連線

  (2)客戶端宕機時,我們無需考慮,下次客戶端重啟之後,我們就可以與伺服器建立連線,但是伺服器宕機時,我們的客戶端如何與伺服器進行連線呢?答:可以在客戶端使用定時任務去輪詢連線,知道連線成功建立為止。

結合上述編解碼技術和資料通訊的第三種方式,做一個統一的演示

netty已經為我們提供了超時機制,只需要在bootstrap中初始化ChannelInitializer時增加超時處理器ReadTimeOutHandler就可以了

ch.pipeline().addLast(new ReadTimeoutHandler(5));單位預設我秒,理論上在一段增加就可以了,為了雙重保險,建議客戶端和服務端同時配置。另外注意在資料傳送完畢後不能新增監聽去關閉(writeFlush.addListenser(ChannelFutureListener.CLOSE),因為該監聽會立即斷開連線。

序列化工具Marshalling部分的封裝:MarshallingCodeCFactory.java

package com.nettyCopyOk;

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通過Marshalling工具類的getProvidedMarshallerFactory靜態方法獲取MarshallerFactory例項
        //引數“serial”表示建立的是Java序列化工廠物件,它由jboss-marshalling-serial-1.3.0.CR9.jar提供。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //建立了MarshallingConfiguration物件
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        //將它的版本號設定為5
        configuration.setVersion(5);
        //然後根據MarshallerFactory和MarshallingConfiguration建立UnmarshallerProvider例項
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //最後通過建構函式建立Netty的MarshallingDecoder物件
        //它有兩個引數,分別是UnmarshallerProvider和單個訊息序列化後的最大長度。
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //建立MarshallerProvider物件,它用於建立Netty提供的MarshallingEncoder例項
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //MarshallingEncoder用於將實現序列化介面的POJO物件序列化為二進位制陣列。
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

請求資料載體類:SubscribeReq.java

package com.nettyCopyOk;


import java.io.Serializable;

public class SubscribeReq implements Serializable {

    /**
     * 預設的序列號ID
     */
    private static final long serialVersionUID = 1L;

    private int subReqID;

    private String userName;

    private String productName;

    private String phoneNumber;

    private String address;

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public int getSubReqID() {
        return subReqID;
    }

    public void setSubReqID(int subReqID) {
        this.subReqID = subReqID;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public String getPhoneNumber() {
        return phoneNumber;
    }

    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "SubscribeReq [subReqID=" + subReqID + ", userName=" + userName
                + ", productName=" + productName + ", phoneNumber="
                + phoneNumber + ", address=" + address + "]";
    }
}

響應資料載體類:SubscribeResp.java

package com.nettyCopyOk;


import java.io.Serializable;

public class SubscribeResp implements Serializable {

   /**
   * 預設序列ID
   */
 private static final long serialVersionUID = 1L;

   private int subReqID;

   private int respCode;

   private String desc;

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public int getSubReqID() {
        return subReqID;
    }

    public void setSubReqID(int subReqID) {
        this.subReqID = subReqID;
    }

    public int getRespCode() {
        return respCode;
    }

    public void setRespCode(int respCode) {
        this.respCode = respCode;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
   public String toString() {
   return "SubscribeResp [subReqID=" + subReqID + ", respCode=" + respCode
     + ", desc=" + desc + "]";
   }
 }

服務端實際處理邏輯:SubReqServerHandler.java

package com.nettyCopyOk;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

@ChannelHandler.Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //經過解碼器handler ObjectDecoder的解碼,
        //SubReqServerHandler接收到的請求訊息已經被自動解碼為SubscribeReq物件,可以直接使用。
       SubscribeReq req = (SubscribeReq) msg;
        if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
            System.out.println("Service accept client subscribe req : ["
                    + req.toString() + "]");
            //對訂購者的使用者名稱進行合法性校驗,校驗通過後列印訂購請求訊息,構造訂購成功應答訊息立即傳送給客戶端。
            ctx.writeAndFlush(resp(req.getSubReqID()));
        }
    }

    private SubscribeResp resp(int subReqID) {
        SubscribeResp resp = new SubscribeResp();
        resp.setSubReqID(subReqID);
        resp.setRespCode(0);
        resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
        return resp;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        System.out.println("發生異常");
        ctx.close();// 發生異常,關閉鏈路
    }


}

客戶端實際處理邏輯:SubReqClientHandler.java

package com.nettyCopyOk;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter {

    public SubReqClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {

    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //由於物件解碼器已經對訂購請求應答訊息進行了自動解碼,
        //因此,SubReqClientHandler接收到的訊息已經是解碼成功後的訂購應答訊息。
        System.out.println("Receive server response : [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

服務端程式碼:server.java

package com.nettyCopyOk;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

/**
 * Created by BaiTianShi on 2018/9/12.
 */
public class Server {

    public void bind(int port) throws Exception {
        // 配置服務端的NIO執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(Channel ch) {
                            //通過工廠類建立MarshallingEncoder解碼器,並新增到ChannelPipeline.
                            ch.pipeline().addLast(com.testMarshaLing.MarshallingCodeCFactory.buildMarshallingDecoder());
                            //通過工廠類建立MarshallingEncoder編碼器,並新增到ChannelPipeline中。
                            ch.pipeline().addLast(com.testMarshaLing.MarshallingCodeCFactory.buildMarshallingEncoder());
                            ch.pipeline().addLast(new ReadTimeoutHandler(3));
                            ch.pipeline().addLast(new SubReqServerHandler());
                        }
                    });

            // 繫結埠,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放執行緒池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用預設值
            }
        }
        new Server().bind(port);
    }
}

客戶端程式碼SubReqClient.java

package com.nettyCopyOk;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;


public class SubReqClient {
    private static class SingletonHolder {
        static final SubReqClient instance = new SubReqClient();
    }

    public static SubReqClient getInstance(){
        return SingletonHolder.instance;
    }

    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf ;
    private SubReqClient(){
        group = new NioEventLoopGroup();
        b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer() {
                    @Override
                    public void initChannel(Channel ch)
                            throws Exception {
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        ch.pipeline().addLast(new ReadTimeoutHandler(5));
                        ch.pipeline().addLast(new SubReqClientHandler());
                    }
                });
    }
    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO執行緒組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer() {
                        @Override
                        public void initChannel(Channel ch)
                                throws Exception {
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            ch.pipeline().addLast(new ReadTimeoutHandler(3));
                            ch.pipeline().addLast(new SubReqClientHandler());
                        }
                    });

            // 發起非同步連線操作
            ChannelFuture f = b.connect(host, port).sync();
            System.out.println("向服務端傳送請求資料");
            // 等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }
    public void connect(){
        try {
            this.cf = b.connect( "127.0.0.1", 8080).sync();
            System.out.println("遠端伺服器已經連線, 可以進行資料交換..");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ChannelFuture getChannelFuture(){

        if(this.cf == null){
            this.connect();
        }
        if(!this.cf.channel().isActive()){
            this.connect();
        }

        return this.cf;
    }
    public static void main(String[] args) throws Exception {


        final SubReqClient c = SubReqClient.getInstance();


        ChannelFuture cf = c.getChannelFuture();
        for (int i = 0; i < 3; i++) {
            SubscribeReq req = new SubscribeReq();
            req.setAddress("南京市江寧區方山國家地質公園");
            req.setPhoneNumber("138xxxxxxxxx");
            req.setProductName("Netty For Marshalling");
            req.setSubReqID(i);
            req.setUserName("Lilinfeng");
            cf.channel().writeAndFlush(req);
//            TimeUnit.SECONDS.sleep(3);
        }
//        cf.channel().flush();
        cf.channel().closeFuture().sync();
        TimeUnit.SECONDS.sleep(4);
        System.out.println(cf.channel().isActive());
        System.out.println(cf.channel().isOpen());

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("進入子執行緒...");
                    ChannelFuture cf = c.getChannelFuture();
                    System.out.println(cf.channel().isActive());
                    System.out.println(cf.channel().isOpen());

                    //再次傳送資料
                    SubscribeReq req = new SubscribeReq();
                    req.setAddress("南京市江寧區方山國家地質公園");
                    req.setPhoneNumber("138xxxxxxxxx");
                    req.setProductName("Netty For Marshalling");
                    req.setSubReqID(10);
                    req.setUserName("Lilinfeng");
                    cf.channel().writeAndFlush(req);
                    cf.channel().closeFuture().sync();
                    System.out.println("子執行緒結束.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        System.out.println("斷開連線,主執行緒結束..");
    }
}

 

udp通訊和壓縮傳輸和解壓等可看下面的例子

udp通訊:http://blog.csdn.net/mffandxx/article/details/53264172

Netty實現Websocket開發:http://www.cnblogs.com/wunaozai/p/5240006.html

netty實現壓縮和解壓傳輸檔案:https://blog.csdn.net/zbw18297786698/article/details/53678133