1. 程式人生 > >Java打造RPC框架(五):連線池化

Java打造RPC框架(五):連線池化

在上一篇的文章中https://blog.csdn.net/we_phone/article/details/79053472

我初步完成了整個RPC框架的搭建,從服務呼叫到服務發現再到負載均衡,這一篇開始進行的是一系列我所知的優化操作

這一篇我講的是連線池,比較簡單,詳細程式碼已託管到github:https://github.com/wephone/MeiZhuoRPC

首先了解一下什麼是連線池

連線池

在平時我們的資料庫增刪查改的業務中,應該絕大多數使用到了連線池技術,例如C3P0,Druid這類框架,幫我們完成了對資料庫連線的池化。

當沒有連線池時會遇到這樣的問題:

要麼每一次進行遠端連線時都去建立一個連線,用完立即釋放,也就是頻繁的建立和銷燬大量連線

或者各個呼叫共用一個單一連線,但在多執行緒的情況下,需要加鎖來避免爭搶的問題,這個方案的問題是效率低下且複雜性高

連線池的做法就是預先載入一定數量的連線放到資源池裡,當需要連線時則從連線池中拿出一個來使用,用完則還回去,當併發量巨大,連線資源匱乏時,根據一定的策略來新建連線或者拒絕。

和jdk中的執行緒池類似,都是資源池化的思想。

在我前面的框架編寫中,我採用的是單一連線的方式,並且用加鎖來保證不會重複連線等等,其實在netty框架的一些版本中,有FixedChannelPool這個東西作為netty的連線池,但在我用的版本里沒有發現這個,出於學習的目的,就自行用其他庫寫了一個來作為RPC框架的連線池。

Commons-Pool

在後續的開發中我引入了這個庫,用來做連線池的基本手腳架,這個框架已經包含了物件池的基本處理,例如建立物件,回收物件,最大數量控制等等。

<dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-pool2</artifactId>
       <version>2.2</version>
</dependency>

首先建立ConnectionPool類來作為我們的連線池,在他的構造方法裡對Commons pool進行初始化配置

    private GenericObjectPool pool;
    private String fullIp;

    public ConnectionPool(String ip,Integer port) {
        ConnectFactory connectFactory=new ConnectFactory(ip, port);
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        //最大空閒連線數
        config.setMaxIdle(RPC.getClientConfig().getPoolMaxIdle());
        //最大連線數
        config.setMaxTotal(RPC.getClientConfig().getPoolMaxTotal());
        pool=new GenericObjectPool(connectFactory,config);
        fullIp=ip+":"+port;
    }
GenericObjectPool就是基礎的物件池手腳架,需要傳入一個配置物件和一個物件工廠

在配置中設定我們需要的最大連線數和最大空閒連線數,其他的配置都用預設的

後面我們就需要用這個連線池來獲取我們的連線,所以給他新增獲取,釋放連線,銷燬連線的操作。

realease方法只是歸還某一個連線到池中,而destroyChannel方法是對整個連線池的銷燬,不單是關閉某個連線鏈路,所以它還需要對連線池內各個連線共用的netty執行緒池進行shutdown

    public Channel getChannel() throws Exception {
        return (Channel) pool.borrowObject();
    }

    public void releaseChannel(Channel channel){
        pool.returnObject(channel);
    }

    public void destroyChannel(){
        //關閉Netty執行緒資源及其註冊的連線
        ((ConnectFactory)pool.getFactory()).getGroup().shutdownGracefully();
        pool.close();
        //移除引用
        RPCRequestNet.getInstance().connectionPoolMap.remove(fullIp);
    }

ConnectFactory類就是我們為連線池建立netty連線的地方 

public class ConnectFactory extends BasePooledObjectFactory<Channel> {

    private String ip;
    private Integer port;
    //netty執行緒組 同一個服務的連線池內各個連線共用
    private EventLoopGroup group=new NioEventLoopGroup();

    public ConnectFactory(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
    }

    public EventLoopGroup getGroup() {
        return group;
    }

首先我們需要將他繼承自手腳架的BasePooledObjectFactory並制定泛型為netty的channel

內部屬性處理ip和埠之外,還需要配上netty的執行緒組,然後對BasePooledObjectFactory的幾個核心方法進行重寫

首先是建立物件的create方法,也就是基本的netty連線的建立並返回channel物件

    @Override
    public Channel create() throws Exception {
        //啟動輔助類 用於配置各種引數
        Bootstrap b=new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new LineBasedFrameDecoder(2048));//以換行符分包 防止粘包半包 2048為最大長度 到達最大長度沒出現換行符則丟擲異常
                        socketChannel.pipeline().addLast(new StringDecoder());//將接收到的物件轉為字串
                        //新增相應回撥處理和編解碼器
                        socketChannel.pipeline().addLast(new RPCRequestHandler());
                    }
                });
        ChannelFuture f=b.connect(ip,port).sync();
        System.out.println("pool create channel "+ip+":"+port);
        return f.channel();
    }

再者是destroy方法,在我們建立了過多的連線後,連線需求下降時,物件池會回收我們的各個連線,這個方法則會被呼叫,這裡我們做的處理是取出相應的channel物件,關閉這個連線鏈路。

    @Override
    public void destroyObject(PooledObject<Channel> p) throws Exception {
        System.out.println("destroy channel "+ip+":"+port);
        //銷燬channel時釋放資源 
        p.getObject().close();
    }

還有一個方法是給物件池內的物件再進行一次封裝以增強功能用的,這裡我們就使用預設的包裝物件就行了

    @Override
    public PooledObject<Channel> wrap(Channel channel) {
        return new DefaultPooledObject<Channel>(channel);
    }

現在我們就完成了連線池的封裝,接下來是把他接入之前的RPC呼叫中

接入連線池

在我們的核心呼叫端連線物件 RPCRequestNet物件中 加入一個單例的map

//每個ip對應一個連線池
    public Map<String,ConnectionPool> connectionPoolMap=new ConcurrentHashMap<String,ConnectionPool>();

key為ip,value就是我們的連線池,即一個服務端ip對應一個連線池,我們的連線服務端的connect方法從原來的單一連線加鎖改成如下

    //負載均衡獲取對應IP 從連線池中獲取連線channel
    private Channel connect(String ip) throws Exception {
        String[] IPArr=ip.split(":");
        String host=IPArr[0];
        Integer port=Integer.valueOf(IPArr[1]);
        if (connectionPoolMap.get(ip)==null){
            ConnectionPool connectionPool = new ConnectionPool(host, port);
            connectionPoolMap.putIfAbsent(ip, connectionPool);
        }
        return connectionPoolMap.get(ip).getChannel();
    }

就是從單例的connectMap中獲取連線池,再從連線池中get一個channel來供應RPC呼叫

        Channel channel=connect(ip);
        channel.writeAndFlush(requestBuf);
        connectionPoolMap.get(ip).releaseChannel(channel);

在我們獲取channel,併發送我們的RPC請求後,我們就要歸還我們的連線,給其他呼叫使用,也就是release操作

這樣我們就完成了初步的連線池預載入和複用連線的操作,從池中取一個channel進行RPC呼叫,用完歸還連線池

最後的問題就是,一個ip對應一個連線池,一個池中有多個channel連線,當這個ip宕機或者其他情況不可用時,我們需要對這整個ip對應的連線池進行銷燬

在我們的負載均衡介面的changeIP方法中,我們可以獲得到本次不可用的ip,在這裡,我們獲取這些IP,並銷燬他們對應的連線池即可。

    //釋放對應連線池
            ConnectionPool connectionPool=RPCRequestNet.getInstance().connectionPoolMap.get(oldIP);
            if (connectionPool!=null) {
                connectionPool.destroyChannel();
            }

到這裡就完成了連線池化的優化,下一篇章我會繼續講述我對非同步RPC呼叫的優化