ProtoBuf和Netty的簡單使用
阿新 • • 發佈:2018-12-01
1.pom.xml:
當然,也可建立個java工程把jar包放進去
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 <groupId>netty-demo</groupId> 5 <artifactId>com.kingdee</artifactId> 6 <version>0.0.1-SNAPSHOT</version> 7 <properties> 8 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 9 <spring.version>3.2.5.RELEASE</spring.version> 10 <spring.rabbit.version>1.3.5.RELEASE</spring.rabbit.version> 11 </properties> 12 <dependencies> 13 <dependency> 14 <groupId>org.springframework</groupId> 15 <artifactId>spring-context</artifactId> 16 <version>${spring.version}</version> 17 </dependency> 18 19 <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> 20 <dependency> 21 <groupId>io.netty</groupId> 22 <artifactId>netty-all</artifactId> 23 <version>4.0.23.Final</version> 24 </dependency> 25 26 <!-- https://mvnrepository.com/artifact/log4j/log4j --> 27 <dependency> 28 <groupId>log4j</groupId> 29 <artifactId>log4j</artifactId> 30 <version>1.2.17</version> 31 </dependency> 32 33 <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging --> 34 <dependency> 35 <groupId>commons-logging</groupId> 36 <artifactId>commons-logging</artifactId> 37 <version>1.1.1</version> 38 </dependency> 39 40 <dependency> 41 <groupId>com.google.protobuf</groupId> 42 <artifactId>protobuf-java</artifactId> 43 <version>3.0.0</version> 44 </dependency> 45 </dependencies> 46 </project>
2.msg.proto,把它轉換成java程式碼,再拷貝到對應的包下,利用proto.exe工具生成
mgs.proto:
它是傳輸的實體類,有兩個部分,client和service
傳輸資料時可以直接.來選擇呼叫哪個物件
package com.netty.demo; message Client { required string head = 1; required string body = 2; } message Server { required int32 code=1; required string message=2; }
在protoc.exe下面放proto檔案,通過命令生成這個傳輸實體類
3.客戶端程式碼:
Client.java:
package com.netty.demo.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static String host = "127.0.0.1"; public static int port = 8787; public static void main(String[] args) { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker); b.channel(NioSocketChannel.class); b.handler(new ClientInitializer()); try { ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); } } }
ClientHandler.java(處理客戶端訊息傳送和收到服務端訊息的處理,但一般情況下是不會在這裡寫傳送訊息的邏輯的,只是為了寫demo,所以把發訊息寫在這裡面)
package com.netty.demo.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import com.google.protobuf.Message;
import com.netty.demo.Msg;
public class ClientHandler extends SimpleChannelInboundHandler<Message> {
/**
*
*/
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
System.out.println("Server say : " + msg.toString());
}
/**
*
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active ");
Msg.Client msg = Msg.Client.newBuilder().setHead("Content-Type:application/json;charset=UTF-8").setBody("hello world!").build();
ctx.writeAndFlush(msg);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client close ");
super.channelInactive(ctx);
}
}
ClientInitializer.java(初始化Chanel,如解碼,加密等),最早的netty傳protobuf,是需要手動toByteArrary()把傳輸物件序列化成二進位制流發出去,接收端再手動反序列化還原成傳輸物件。
但是後來通過設定protubuf編碼解碼器,就可以自動實現序列化和反序列化,傳輸時只需要把實體發出去就行了。
package com.netty.demo.client;
import com.netty.demo.Msg;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
// decoded
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
//這裡是收到服務端發過來的訊息,所以是對服務端的response解碼
ch.pipeline().addLast(new ProtobufDecoder(Msg.Server.getDefaultInstance()));
// encoded
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new ProtobufEncoder());
// 註冊handler
ch.pipeline().addLast(new ClientHandler());
}
}
4.Server端程式碼:
Server.java
package com.netty.demo.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
private static int port = 8787;
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(boss, worker);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ServerInitializer());
server.option(ChannelOption.SO_BACKLOG, 128);
server.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
//繫結埠 同步等待成功
ChannelFuture f = server.bind(port).sync();
//等待服務端監聽埠關閉
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
ServerHandler.java:
package com.netty.demo.server;
import java.net.InetAddress;
import com.google.protobuf.Message;
import com.netty.demo.Msg;
import com.netty.demo.Msg.Client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 處理客戶端連線時的handler
*
* @author shizhengchao32677
*
*/
public class ServerHandler extends SimpleChannelInboundHandler<Message> {
/**
* 收到客戶端發過來的訊息
*/
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
// 收到訊息直接列印輸出
System.out.println(msg.getClass());
Msg.Server response = null;
if(msg instanceof Msg.Client) {
Msg.Client clientMsg = (Client) msg;
System.out.println(ctx.channel().remoteAddress() + " Say : " + clientMsg.getBody());
response = Msg.Server.newBuilder().setCode(0).setMessage("Received client message success").build();
} else {
response = Msg.Server.newBuilder().setCode(-1).setMessage("client message is illegal").build();
System.out.println("client message is illegal");
}
// 返回客戶端訊息 - 我已經接收到了你的訊息
ctx.writeAndFlush(response);
}
/*
* 覆蓋 channelActive 方法 在channel被啟用的時候觸發 (在建立連線的時候)
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
String welcome = "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!";
Msg.Server response = Msg.Server.newBuilder().setCode(101).setMessage(welcome).build();
ctx.writeAndFlush(response);
super.channelActive(ctx);
}
}
ServerInitializer.java
package com.netty.demo.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import com.netty.demo.Msg;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
// decoded
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
//解碼客戶端發過來的訊息
ch.pipeline().addLast(new ProtobufDecoder(Msg.Client.getDefaultInstance()));
// encoded
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new ProtobufEncoder());
// 註冊handler
ch.pipeline().addLast(new ServerHandler());
}
}
執行Server.java和Client.java:
Server輸出:
RamoteAddress : /127.0.0.1:59693 active !
class com.netty.demo.Msg$Client
/127.0.0.1:59693 Say : hello world!
Clientl輸出:
Client active
Server say : code: 101
message: "Welcome to H4UOJJQSF23HQ91 service!"
Server say : code: 0
message: "Received client message success"