1. 程式人生 > >ProtoBuf和Netty的簡單使用

ProtoBuf和Netty的簡單使用

1.pom.xml:

當然,也可建立個java工程把jar包放進去

 

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4     <groupId>netty-demo</groupId>
 5     <artifactId>com.kingdee</artifactId>
 6     <version>0.0.1-SNAPSHOT</version>
 7     <properties>
 8         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 9         <spring.version>3.2.5.RELEASE</spring.version>
10         <spring.rabbit.version>1.3.5.RELEASE</spring.rabbit.version>
11     </properties>
12     <dependencies>
13         <dependency>
14             <groupId>org.springframework</groupId>
15             <artifactId>spring-context</artifactId>
16             <version>${spring.version}</version>
17         </dependency>
18 
19         <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
20         <dependency>
21             <groupId>io.netty</groupId>
22             <artifactId>netty-all</artifactId>
23             <version>4.0.23.Final</version>
24         </dependency>
25 
26         <!-- https://mvnrepository.com/artifact/log4j/log4j -->
27         <dependency>
28             <groupId>log4j</groupId>
29             <artifactId>log4j</artifactId>
30             <version>1.2.17</version>
31         </dependency>
32 
33         <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
34         <dependency>
35             <groupId>commons-logging</groupId>
36             <artifactId>commons-logging</artifactId>
37             <version>1.1.1</version>
38         </dependency>
39 
40         <dependency>
41             <groupId>com.google.protobuf</groupId>
42             <artifactId>protobuf-java</artifactId>
43             <version>3.0.0</version>
44         </dependency>
45     </dependencies>
46 </project>

 

2.msg.proto,把它轉換成java程式碼,再拷貝到對應的包下,利用proto.exe工具生成

mgs.proto:

它是傳輸的實體類,有兩個部分,client和service

傳輸資料時可以直接.來選擇呼叫哪個物件

 

package com.netty.demo;
message Client {  
    required string head = 1;  
    required string body = 2;  
}

message Server {
    required int32 code=1;
    required string message=2;
}

 

在protoc.exe下面放proto檔案,通過命令生成這個傳輸實體類

3.客戶端程式碼:

Client.java:

 

package com.netty.demo.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
    public static String host = "127.0.0.1";
    public static int port = 8787;

    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(worker);
        b.channel(NioSocketChannel.class);
        b.handler(new ClientInitializer());
        try {
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
        }

    }
}

複製程式碼

ClientHandler.java(處理客戶端訊息傳送和收到服務端訊息的處理,但一般情況下是不會在這裡寫傳送訊息的邏輯的,只是為了寫demo,所以把發訊息寫在這裡面)

 

package com.netty.demo.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import com.google.protobuf.Message;
import com.netty.demo.Msg;

public class ClientHandler extends SimpleChannelInboundHandler<Message> {

    /**
     * 
     */
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        
        System.out.println("Server say : " + msg.toString());
    }

    /**
     * 
     */
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client active ");
        Msg.Client msg = Msg.Client.newBuilder().setHead("Content-Type:application/json;charset=UTF-8").setBody("hello world!").build();
        ctx.writeAndFlush(msg);
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client close ");
        super.channelInactive(ctx);
    }

}

 

ClientInitializer.java(初始化Chanel,如解碼,加密等),最早的netty傳protobuf,是需要手動toByteArrary()把傳輸物件序列化成二進位制流發出去,接收端再手動反序列化還原成傳輸物件。

但是後來通過設定protubuf編碼解碼器,就可以自動實現序列化和反序列化,傳輸時只需要把實體發出去就行了。

package com.netty.demo.client;

import com.netty.demo.Msg;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;

public class ClientInitializer extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel ch) throws Exception {
        // decoded
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        //這裡是收到服務端發過來的訊息,所以是對服務端的response解碼
        ch.pipeline().addLast(new ProtobufDecoder(Msg.Server.getDefaultInstance()));
        // encoded
        ch.pipeline().addLast(new LengthFieldPrepender(4));
        ch.pipeline().addLast(new ProtobufEncoder());
        // 註冊handler
        ch.pipeline().addLast(new ClientHandler());
    }

}

複製程式碼

4.Server端程式碼:

Server.java

 

package com.netty.demo.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
    private static int port = 8787;
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(boss, worker);
        server.channel(NioServerSocketChannel.class);
        server.childHandler(new ServerInitializer());
        server.option(ChannelOption.SO_BACKLOG, 128);
        server.childOption(ChannelOption.SO_KEEPALIVE, true);
        
        try {
            //繫結埠 同步等待成功
            ChannelFuture f = server.bind(port).sync();
            //等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

複製程式碼

ServerHandler.java:

 

package com.netty.demo.server;

import java.net.InetAddress;

import com.google.protobuf.Message;
import com.netty.demo.Msg;
import com.netty.demo.Msg.Client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * 處理客戶端連線時的handler
 * 
 * @author shizhengchao32677
 * 
 */
public class ServerHandler extends SimpleChannelInboundHandler<Message> {

    /**
     * 收到客戶端發過來的訊息
     */
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 收到訊息直接列印輸出
        System.out.println(msg.getClass());
        Msg.Server response = null;
        if(msg instanceof Msg.Client) {
            Msg.Client clientMsg = (Client) msg;
            System.out.println(ctx.channel().remoteAddress() + " Say : " + clientMsg.getBody());
            response = Msg.Server.newBuilder().setCode(0).setMessage("Received client message success").build();
        } else {
            response = Msg.Server.newBuilder().setCode(-1).setMessage("client message is illegal").build();
            System.out.println("client message is illegal");
        }
        // 返回客戶端訊息 - 我已經接收到了你的訊息
        ctx.writeAndFlush(response);
    }

    /*
     * 覆蓋 channelActive 方法 在channel被啟用的時候觸發 (在建立連線的時候)
     */
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
        String welcome = "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!";
        Msg.Server response = Msg.Server.newBuilder().setCode(101).setMessage(welcome).build();
        ctx.writeAndFlush(response);
        super.channelActive(ctx);
    }

}

 

ServerInitializer.java

 

package com.netty.demo.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;

import com.netty.demo.Msg;

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel ch) throws Exception {
        // decoded
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        //解碼客戶端發過來的訊息
        ch.pipeline().addLast(new ProtobufDecoder(Msg.Client.getDefaultInstance()));
        // encoded
        ch.pipeline().addLast(new LengthFieldPrepender(4));
        ch.pipeline().addLast(new ProtobufEncoder());
        // 註冊handler
        ch.pipeline().addLast(new ServerHandler());
    }

}

 

 

執行Server.java和Client.java:

 

Server輸出:

RamoteAddress : /127.0.0.1:59693 active !
class com.netty.demo.Msg$Client
/127.0.0.1:59693 Say : hello world!



Clientl輸出:

Client active 
Server say : code: 101
message: "Welcome to H4UOJJQSF23HQ91 service!"

Server say : code: 0
message: "Received client message success"