1. 程式人生 > >netty使用jboss的Marshalling編解碼(物件序列化)

netty使用jboss的Marshalling編解碼(物件序列化)

36套java進階高階架構師視訊+38套大資料視訊  保證全是硬貨需要的

+微信:

du13797566440


首選映入netty-all-5.0.0.Alpha2.jar  jboss-marshalling-serial-1.3.0.CR9.jar  jboss-marshalling-1.3.0.CR9.jar 包

/**
 * 客戶端
 */
public class Client {

//內部類的懶載入
private static class SingletonHolder {
static final Client instance = new Client();
}

public static Client getInstance(){
return SingletonHolder.instance;
}

private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf ;

private Client(){
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); //使用jboss的marshalling做物件序列化 
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //使用jboss的marshalling做物件反序列化
//超時handler(當伺服器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉響應的通道,主要為減小服務端資源佔用)
sc.pipeline().addLast(new ReadTimeoutHandler(5));  //設定回話超時時間5s
sc.pipeline().addLast(new ClientHandler());        //處理器
}
   });
}

public void connect(){
try {
this.cf = b.connect("127.0.0.1", 8765).sync(); //建立連線
System.out.println("遠端伺服器已經連線, 可以進行資料交換..");
} catch (Exception e) {
e.printStackTrace();
}
}

public ChannelFuture getChannelFuture(){ 

if(this.cf == null){ //程式啟動時連線
this.connect();
}
if(!this.cf.channel().isActive()){ //如果連線斷開時再次建立連線
this.connect();
}

return this.cf;
}

public static void main(String[] args) throws Exception{
final Client c = Client.getInstance();
//c.connect();

ChannelFuture cf = c.getChannelFuture();
for(int i = 1; i <= 3; i++ ){
Request request = new Request();
request.setId("" + i);
request.setName("pro" + i);
request.setRequestMessage("資料資訊" + i);
cf.channel().writeAndFlush(request);//傳送訊息
TimeUnit.SECONDS.sleep(4);
}


cf.channel().closeFuture().sync();//當連線未斷開時阻塞


new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("進入子執行緒...");
ChannelFuture cf = c.getChannelFuture();//重新建立連線
System.out.println(cf.channel().isActive());
System.out.println(cf.channel().isOpen());

//再次傳送資料
Request request = new Request();
request.setId("" + 4);
request.setName("pro" + 4);
request.setRequestMessage("資料資訊" + 4);
cf.channel().writeAndFlush(request);
cf.channel().closeFuture().sync();
System.out.println("子執行緒結束.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

System.out.println("斷開連線,主執行緒結束..");

}
}

----------------------------------------------------------------

//客戶端處理器
public class ClientHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {


}


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response)msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
}


@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}

-----------------------------------------------------------------------

//需要傳送的物件
public class Request implements Serializable{


private static final long  SerialVersionUID = 1L;

private String id ;
private String name ;
private String requestMessage ;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}

}

---------------------------------------------------------------

//服務端
public class Server {


public static void main(String[] args) throws Exception{

EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//設定日誌
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
sc.pipeline().addLast(new ServerHandler());
}
});

ChannelFuture cf = b.bind(8765).sync();

cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();

}
}

-----------------------------------------------------

//服務端處理器
public class ServerHandler extends ChannelHandlerAdapter{


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {


}


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request)msg;
System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
Response response = new Response();
response.setId(request.getId());
response.setName("response" + request.getId());
response.setResponseMessage("響應內容" + request.getId());
ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
}


@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}


}

--------------------------------------------------

//響應物件,序列化和反序列化時物件兩端的物件包名型別必須一致
public class Response implements Serializable{

private static final long serialVersionUID = 1L;

private String id;
private String name;
private String responseMessage;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}



}