1. 程式人生 > >淺談高可用架構中NIO的重要性

淺談高可用架構中NIO的重要性

一個功能引發的思考

今天同事開發了一個檔案讀寫的模組,發現讀寫效能異常的低,他的做法是單執行緒純IO操作,頻繁的開啟關閉IO流,讀寫。
於是乎他問我這個應該怎麼做,我給他講解到這種做法的低效,建議他批量的一次性寫入,頻繁直接操作IO效能當然是無法接受的。

再談IO操作的演變

BIO:傳統的cs端架構,都是一個請求提交,後臺一個專門的執行緒負責接受這個請求,分配給新的執行緒去處理。這種做法的缺點很明顯,當併發量過大的時候,不斷的建立新的執行緒,對於伺服器來說,肯定是無法接受的,這樣會導致大量的請求阻塞未響應。而且建立執行緒這個操作java是直接操作 作業系統核心的,光建立執行緒這個,就會導致系統假死,cpu100等情況。
NIO:直到jdk1.4以後,才推出了非阻塞的IO操作,基於I/O多路複用技術,減小系統開銷。即,把多個I/O的阻塞複用到同一個select上阻塞,從而使得單執行緒情況下,可以同時處理多個請求。系統不需要建立新的額外程序或者執行緒,也不需要維護這些程序和執行緒的執行。並且在jdk1.5中,epool替代傳統select/poll,極大提升NIO通訊新能

jdk nio存在的缺陷

1.開發複雜度非常高,例如:客戶端重連,網路閃斷,半包讀寫,失敗快取,網路擁塞,異常碼流的處理等。
2.jdk epoll bug會導致selector空輪詢,最終導致cpu100

基於Netty實現NIO

Netty算是一個比較成熟的nio框架了,他解決了jdk中epoll的bug,並且開發複雜度也比較低一點。
後面上一個簡單的netty nio實現原始碼

基於Netty開發應用層協議

隨著網際網路發展,傳統垂直架構逐漸被分散式,彈性可伸縮分散式架構替代。
那麼問題也隨之而出,系統只有分散式,就面臨各個節點間通訊的問題,尤為強調高可用,擴充套件性強,高效能。
1.高效能
傳統java序列化效能比較低,而且序列化後的位元組流太大,現在出的很多新的序列化框架,效能都比傳統的jdk要高很多,如:google的protobuf facebook的thrift jboss的marshalling。另外netty對非同步非阻塞通訊的支援,以及高效reactor執行緒模型,無鎖化序列設計,0拷貝,靈活tcp引數配置等。
2.擴充套件性
傳統java序列化無法跨語言,所以目前的java rpc框架基本也都沒用jdk的,比如thirft:可以跨c++,c#,cocoa,erlang,java,perl,php,python,ruby等

3.高可用
傳統bio模型,對併發訪問支援很差,而新的nio就不同,具體的上面也談到了

4.可靠性
1>網路通訊類
連線超時介面,強制關閉對端連線
2>鏈路有效性
通過心跳檢查
3>reactor執行緒保護
主要謹慎處理I/O異常,以及規避NIO bug
4>鏈路控制
5>優雅停機
5.安全性
安全性可以通過netty提供的SSL認證,也可以通過第三方CA認證來保障
6.成功案例
目前也已經有很多成熟應用netty的框架,如alibaba的rocketMQ dubbo ,Apache的avro等等

副一段Netty基礎NIO程式碼

package com.solace.nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by Administrator on 2018/4/10.
 *
 * @author solace
 */
public class TimerServer {
    public void bind(int port) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChildChannelHandler());
            //繫結埠,同步等待成功
            ChannelFuture future = bootstrap.bind(port).sync();
            //等待服務端監聽埠關閉
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new TimerServer().bind(8088);
    }

}
package com.solace.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.logging.Logger;


/**
 * Created by Administrator on 2018/4/10.
 *
 * @author solace
 */
public class TimeClientHandler extends ChannelHandlerAdapter{
    public static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    private final ByteBuf fisrtMessage;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        fisrtMessage = Unpooled.buffer(req.length);
        fisrtMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(fisrtMessage);
    }

    /**
     * 解碼讀取訊息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"UTF-8");
        System.out.println("Now is :"+body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warning("Unexpercted exception from downstream :"+cause.getMessage());
        ctx.close();
    }
}
package com.solace.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;


/**
 * Created by Administrator on 2018/4/10.
 *
 * @author solace
 */
public class TimeServerHandler extends ChannelHandlerAdapter{

    /**
     * 讀到訊息後回執
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"UTF-8");
        System.out.println("the time server receive order:"+body);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"bad order";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    /**
     * 訊息讀完了後flush
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * 訊息讀取異常處理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
package com.solace.nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by Administrator on 2018/4/10.
 *
 * @author solace
 */
public class TimeClient {
    public void connect(int port,String host) throws InterruptedException {
        //配置NIO執行緒租
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //非同步發起連線請求
            ChannelFuture future = bootstrap.connect(host,port).sync();
            //等待客戶端鏈路關閉
            future.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new TimeClient().connect(8088,"127.0.0.1");
    }
}
package com.solace.nio;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;


import java.util.logging.SocketHandler;

/**
 * Created by Administrator on 2018/4/10.
 *
 * @author solace
 */
public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

    /**
     * 新增自定義處理類
     * @param channel
     * @throws Exception
     */
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline().addLast(new TimeServerHandler());
    }
}