1. 程式人生 > >netty框架 基於noi的(同步非阻塞io)長連線方案

netty框架 基於noi的(同步非阻塞io)長連線方案

Socket通訊(BIO/NIO/AIO)程式設計

        BIO:  傳統阻塞IO

        NIO: 同步非阻塞式IO
        AIO: 非同步非阻塞IO,(非阻塞採用的是註冊通知的模式。)

為什麼會選擇Netty?因為它簡單!使用Netty不必編寫複雜的邏輯程式碼去實現通訊,再也不需要去考慮效能問題,不需要考慮編碼問題,半包讀寫等問題。強大的Netty已經幫我們實現好了,我們只需要使用即可。


原生NIO的三個最主要的類: Buffer(緩衝區),Channel(通道),Selector(多路複用器)

什麼是阻塞?

應用程式在獲取網路資料的時候,如果網路傳輸資料很慢,就會一直等待,直到傳輸完畢為止。

什麼是非阻塞?

應用程式直接可以獲取已經準備就緒好的資料,無需等待。

NIO(同步非阻塞式IO)

同步非阻塞式IO,伺服器實現模式為一個請求一個執行緒,即客戶端傳送的連線請求都會註冊到多路複用器上,多路複用器輪詢到連線有I/O請求時才啟動一個執行緒進行處理。


Netty簡介

Netty是基於Java NIO client-server的網路應用框架,使用Netty可以快速開發網路應用,例如伺服器和客戶端協議。Netty提供了一種新的方式來開發網路應用程式,這種新的方式使它很容易使用和具有很強的擴充套件性。Netty的內部實現是很複雜的,但是Netty提供了簡單易用的API從網路處理程式碼中解耦業務邏輯。Netty是完全基於NIO實現的,所以整個Netty都是非同步的。

Netty通訊的步驟:

①建立兩個NIO執行緒組,一個專門用於網路事件處理(接受客戶端的連線),另一個則進行網路通訊的讀寫。

②建立一個ServerBootstrap物件,配置Netty的一系列引數,例如接受傳出資料的快取大小等。

③建立一個用於實際處理資料的類ChannelInitializer,進行初始化的準備工作,比如設定接受傳出資料的字符集、格式以及實際處理資料的介面。

④繫結埠,執行同步阻塞方法等待伺服器端啟動即可。


詳細程式碼如下

服務端:

    步驟1:服務端啟動類

package netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 程式的入口,負責啟動應用
 * @author liuyazhuang
 *
 */
public class Main {
    static boolean isBrowser = false;
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new MyChannelInitializer(isBrowser));
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.childOption(ChannelOption.SO_KEEPALIVE, true);
            System.out.println("服務端開啟等待客戶端連線...."+(isBrowser?"瀏覽器":"Nio客戶端"));
            Channel ch = b.bind(8888).sync().channel();
            ch.closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            //優雅的退出程式
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

     步驟2:初始化通道處理器(新增各種處理器,例如編解碼,業務處理器)

package netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;


/**
 * 初始化連線時候的各個元件
 * @author liuyazhuang
 *
 */
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
	private boolean isBrowser;
	public MyChannelInitializer(boolean isBrowser){
		this.isBrowser = isBrowser;
	}
	@Override
	protected void initChannel(SocketChannel e) throws Exception {

		ChannelPipeline pipeline = e.pipeline();

		if(isBrowser) {
			//用於瀏覽器的websocket場景
			pipeline.addLast("http-codec", new HttpServerCodec());//HttpServerCodec:將請求和應答訊息解碼為HTTP訊息
			pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//把Http訊息組成完整地HTTP訊息
			pipeline.addLast("http-chunked", new ChunkedWriteHandler());//向客戶端傳送HTML5檔案
		}
		pipeline.addLast(new IdleStateHandler(7, 7, 7, TimeUnit.SECONDS));//設定7秒沒有讀到資料,則觸發一個READER_IDLE事件
		if(isBrowser) {
			pipeline.addLast("handler", new ServerSocketHandlerBrowser());
		} else {
			pipeline.addLast("decoder", new StringDecoder());
			pipeline.addLast("encoder", new StringEncoder());
			pipeline.addLast("handler", new ServerSocketHandlerNIO());
		}

		//e.pipeline().addLast(new ReadTimeoutHandler(15));//設定連線最長時間,時間一到連線斷開。
	}

}


    步驟3:新增通道組全域性常量,方便操作通道的傳送訊息,(單發或者群發)

package netty.server;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 儲存整個工程的全域性配置
 * @author liuyazhuang
 *
 */
public class NettyConfig {
	
	/**
	 * 儲存每一個客戶端接入進來時的channel物件
	 */
	public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

    步驟四:實現業務處理器(瀏覽器websocket)

package netty.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

import java.util.Date;

import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.setContentLength;

public class ServerSocketHandlerBrowser extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        //處理客戶端向服務端發起http握手請求的業務
        if (o instanceof FullHttpRequest) {
            handHttpRequest(channelHandlerContext,  (FullHttpRequest)o);
        }else if (o instanceof WebSocketFrame) { //處理websocket連線業務
//            ridx是readerIndex讀取資料索引,位置從0開始
//            widx是writeIndex寫資料索引,位置從0開始
//            cap是capacity緩衝區初始化的容量,預設256,可以通過Unpooled.buffer(8)設定,初始化緩衝區容量是8。
            handWebsocketFrame(channelHandlerContext, (WebSocketFrame)o);
        }
    }

    private int loss_connect_time = 0;
    private int retryTime = 8;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                loss_connect_time++;
                System.out.println("5 秒沒有接收到客戶端的資訊了");
                if (loss_connect_time > retryTime-1) {
                    StringBuffer bf = new StringBuffer("關閉這個不活躍的channel").append("(因為連續檢測了").append(retryTime).append("次讀狀態都是空閒的)");
                    System.out.println(bf.toString());
                    ctx.channel().close();
                }
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    private WebSocketServerHandshaker handshaker;
    private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
    //客戶端與服務端建立連線的時候呼叫
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        NettyConfig.group.add(ctx.channel());
        System.out.println("客戶端與服務端連線開啟...");
        super.channelActive(ctx);
    }

    //客戶端與服務端斷開連線的時候呼叫
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyConfig.group.remove(ctx.channel());
        System.out.println("客戶端與服務端連線關閉...");
    }

    //服務端接收客戶端傳送過來的資料結束之後呼叫
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    //工程出現異常的時候呼叫
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 處理客戶端與服務端之前的websocket業務
     * @param ctx
     * @param frame
     */
    private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
        //判斷是否是關閉websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
        }
        //判斷是否是ping訊息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }

        //判斷是否是二進位制訊息,如果是二進位制訊息,丟擲異常
        if( ! (frame instanceof TextWebSocketFrame) ){
            System.out.println("目前我們不支援二進位制訊息");
            throw new RuntimeException("【"+this.getClass().getName()+"】不支援訊息");
        }
        //返回應答訊息
        //獲取客戶端向服務端傳送的訊息
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println("服務端收到客戶端的訊息====>>>" + request);
        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
                + ctx.channel().id()
                + " ===>>> "
                + request);
        //群發,服務端向每個連線上來的客戶端群發訊息
        NettyConfig.group.writeAndFlush(tws);
    }
    /**
     * 處理客戶端向服務端發起http握手請求的業務
     * @param ctx
     * @param req
     */
    private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
        if (!req.decoderResult().isSuccess()
                || ! ("websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req,
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        //構造握手響應返回,本機測試
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                WEB_SOCKET_URL, null, false);
        handshaker = wsFactory.newHandshaker(req);
        //把握手訊息返回給客戶端
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{
            handshaker.handshake(ctx.channel(), req);
        }
    }

    /**
     * 服務端向客戶端響應訊息
     * @param ctx
     * @param req
     * @param res
     */
    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,
                                  DefaultFullHttpResponse res){
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            setContentLength(res,res.content().readableBytes());
        }
        //如果是非Keep-Alive,關閉連線
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
}


        步驟四:實現業務處理器(java客戶端)

package netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * 接收/處理/響應客戶端websocket請求的核心業務處理類
 * @author liuyazhuang
 *
 */
public class ServerSocketHandlerNIO extends SimpleChannelInboundHandler<Object> {
	@Override
	protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
		try {
			System.out.println("Client say : " + o.toString());
			//返回客戶端訊息 - 我已經接收到了你的訊息
			channelHandlerContext.writeAndFlush("Received your message : " + o.toString());
		} catch (Exception e) {
			System.out.println("解析不出來啊哈哈");
		}
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		super.channelRead(ctx, msg);
		System.out.println(msg);
	}

	private int loss_connect_time = 0;
    private int retryTime = 8;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                loss_connect_time++;
                System.out.println("5 秒沒有接收到客戶端的資訊了");
                if (loss_connect_time > retryTime-1) {
                    StringBuffer bf = new StringBuffer("關閉這個不活躍的channel").append("(因為連續檢測了").append(retryTime).append("次讀狀態都是空閒的)");
                    System.out.println(bf.toString());
                    ctx.channel().close();
                }
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    private WebSocketServerHandshaker handshaker;
	private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
	//客戶端與服務端建立連線的時候呼叫
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		NettyConfig.group.add(ctx.channel());
		System.out.println("客戶端與服務端連線開啟...");
        super.channelActive(ctx);
	}

	//客戶端與服務端斷開連線的時候呼叫
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		NettyConfig.group.remove(ctx.channel());
		System.out.println("客戶端與服務端連線關閉...");
	}

	//服務端接收客戶端傳送過來的資料結束之後呼叫
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
			ctx.flush();
	}

	//工程出現異常的時候呼叫
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}


客戶端程式碼

    步驟一: 配置客戶端主程式,新增通道處理器

package netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;

public class client {
    static Channel channel = null;
    public void connect(int port, String host) throws Exception {
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            /*
                             * 解碼和編碼,應和服務端一致
                             * */
                            //字串解碼和編碼
                            p.addLast("decoder", new StringDecoder());
                            p.addLast("encoder", new StringEncoder());
                            p.addLast(new IdleStateHandler(3, 4, 5, TimeUnit.SECONDS));
                            p.addLast("ping",new HeartBeatClientHandler());
                        }
                    });

            //連線客戶端
             channel = b.connect(host, port).sync().channel();

            //控制檯輸入
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                String line = in.readLine();
                if (line == null) {
                    continue;
                }
                //向服務端傳送資料
                channel.writeAndFlush(line);
            }
        } finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        int port = 8888;
        new client().connect(port, "127.0.0.1");//NIO模式是(非阻塞同步io)執行緒一直會卡在這裡。

    }

}

    步驟二: 實現通道業務處理器

package netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    private static final int TRY_TIMES = 5;
    private int currentTime = 0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("啟用時間是:"+new Date());
        if (ctx.channel().isWritable()) {
            System.out.println("連結成功,我要傳送資料啦");
        }
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止時間是:"+new Date());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                if(currentTime <= TRY_TIMES){
                    currentTime++;
                    StringBuffer bf = new StringBuffer("寫空閒,開始傳送心跳:")
                            .append("(第").append(currentTime).append("次寫心跳)")
                            .append("(最多寫心跳次數").append(TRY_TIMES+1).append(")");
                    System.out.println(bf.toString());
                    ctx.writeAndFlush("Heartbeat");
                }
            }
        }else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Received your message : Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }
}