Java打造RPC框架(五):連線池化
在上一篇的文章中https://blog.csdn.net/we_phone/article/details/79053472
我初步完成了整個RPC框架的搭建,從服務呼叫到服務發現再到負載均衡,這一篇開始進行的是一系列我所知的優化操作
這一篇我講的是連線池,比較簡單,詳細程式碼已託管到github:https://github.com/wephone/MeiZhuoRPC
首先了解一下什麼是連線池
連線池
在平時我們的資料庫增刪查改的業務中,應該絕大多數使用到了連線池技術,例如C3P0,Druid這類框架,幫我們完成了對資料庫連線的池化。
當沒有連線池時會遇到這樣的問題:
要麼每一次進行遠端連線時都去建立一個連線,用完立即釋放,也就是頻繁的建立和銷燬大量連線
或者各個呼叫共用一個單一連線,但在多執行緒的情況下,需要加鎖來避免爭搶的問題,這個方案的問題是效率低下且複雜性高。
連線池的做法就是預先載入一定數量的連線放到資源池裡,當需要連線時則從連線池中拿出一個來使用,用完則還回去,當併發量巨大,連線資源匱乏時,根據一定的策略來新建連線或者拒絕。
和jdk中的執行緒池類似,都是資源池化的思想。
在我前面的框架編寫中,我採用的是單一連線的方式,並且用加鎖來保證不會重複連線等等,其實在netty框架的一些版本中,有FixedChannelPool這個東西作為netty的連線池,但在我用的版本里沒有發現這個,出於學習的目的,就自行用其他庫寫了一個來作為RPC框架的連線池。
Commons-Pool
在後續的開發中我引入了這個庫,用來做連線池的基本手腳架,這個框架已經包含了物件池的基本處理,例如建立物件,回收物件,最大數量控制等等。
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
首先建立ConnectionPool類來作為我們的連線池,在他的構造方法裡對Commons pool進行初始化配置
private GenericObjectPool pool;
private String fullIp;
public ConnectionPool(String ip,Integer port) {
ConnectFactory connectFactory=new ConnectFactory(ip, port);
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
//最大空閒連線數
config.setMaxIdle(RPC.getClientConfig().getPoolMaxIdle());
//最大連線數
config.setMaxTotal(RPC.getClientConfig().getPoolMaxTotal());
pool=new GenericObjectPool(connectFactory,config);
fullIp=ip+":"+port;
}
GenericObjectPool就是基礎的物件池手腳架,需要傳入一個配置物件和一個物件工廠
在配置中設定我們需要的最大連線數和最大空閒連線數,其他的配置都用預設的
後面我們就需要用這個連線池來獲取我們的連線,所以給他新增獲取,釋放連線,銷燬連線的操作。
realease方法只是歸還某一個連線到池中,而destroyChannel方法是對整個連線池的銷燬,不單是關閉某個連線鏈路,所以它還需要對連線池內各個連線共用的netty執行緒池進行shutdown
public Channel getChannel() throws Exception {
return (Channel) pool.borrowObject();
}
public void releaseChannel(Channel channel){
pool.returnObject(channel);
}
public void destroyChannel(){
//關閉Netty執行緒資源及其註冊的連線
((ConnectFactory)pool.getFactory()).getGroup().shutdownGracefully();
pool.close();
//移除引用
RPCRequestNet.getInstance().connectionPoolMap.remove(fullIp);
}
ConnectFactory類就是我們為連線池建立netty連線的地方
public class ConnectFactory extends BasePooledObjectFactory<Channel> {
private String ip;
private Integer port;
//netty執行緒組 同一個服務的連線池內各個連線共用
private EventLoopGroup group=new NioEventLoopGroup();
public ConnectFactory(String ip, Integer port) {
this.ip = ip;
this.port = port;
}
public EventLoopGroup getGroup() {
return group;
}
首先我們需要將他繼承自手腳架的BasePooledObjectFactory並制定泛型為netty的channel
內部屬性處理ip和埠之外,還需要配上netty的執行緒組,然後對BasePooledObjectFactory的幾個核心方法進行重寫
首先是建立物件的create方法,也就是基本的netty連線的建立並返回channel物件
@Override
public Channel create() throws Exception {
//啟動輔助類 用於配置各種引數
Bootstrap b=new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(2048));//以換行符分包 防止粘包半包 2048為最大長度 到達最大長度沒出現換行符則丟擲異常
socketChannel.pipeline().addLast(new StringDecoder());//將接收到的物件轉為字串
//新增相應回撥處理和編解碼器
socketChannel.pipeline().addLast(new RPCRequestHandler());
}
});
ChannelFuture f=b.connect(ip,port).sync();
System.out.println("pool create channel "+ip+":"+port);
return f.channel();
}
再者是destroy方法,在我們建立了過多的連線後,連線需求下降時,物件池會回收我們的各個連線,這個方法則會被呼叫,這裡我們做的處理是取出相應的channel物件,關閉這個連線鏈路。
@Override
public void destroyObject(PooledObject<Channel> p) throws Exception {
System.out.println("destroy channel "+ip+":"+port);
//銷燬channel時釋放資源
p.getObject().close();
}
還有一個方法是給物件池內的物件再進行一次封裝以增強功能用的,這裡我們就使用預設的包裝物件就行了
@Override
public PooledObject<Channel> wrap(Channel channel) {
return new DefaultPooledObject<Channel>(channel);
}
現在我們就完成了連線池的封裝,接下來是把他接入之前的RPC呼叫中
接入連線池
在我們的核心呼叫端連線物件 RPCRequestNet物件中 加入一個單例的map
//每個ip對應一個連線池
public Map<String,ConnectionPool> connectionPoolMap=new ConcurrentHashMap<String,ConnectionPool>();
key為ip,value就是我們的連線池,即一個服務端ip對應一個連線池,我們的連線服務端的connect方法從原來的單一連線加鎖改成如下
//負載均衡獲取對應IP 從連線池中獲取連線channel
private Channel connect(String ip) throws Exception {
String[] IPArr=ip.split(":");
String host=IPArr[0];
Integer port=Integer.valueOf(IPArr[1]);
if (connectionPoolMap.get(ip)==null){
ConnectionPool connectionPool = new ConnectionPool(host, port);
connectionPoolMap.putIfAbsent(ip, connectionPool);
}
return connectionPoolMap.get(ip).getChannel();
}
就是從單例的connectMap中獲取連線池,再從連線池中get一個channel來供應RPC呼叫
Channel channel=connect(ip);
channel.writeAndFlush(requestBuf);
connectionPoolMap.get(ip).releaseChannel(channel);
在我們獲取channel,併發送我們的RPC請求後,我們就要歸還我們的連線,給其他呼叫使用,也就是release操作
這樣我們就完成了初步的連線池預載入和複用連線的操作,從池中取一個channel進行RPC呼叫,用完歸還連線池
最後的問題就是,一個ip對應一個連線池,一個池中有多個channel連線,當這個ip宕機或者其他情況不可用時,我們需要對這整個ip對應的連線池進行銷燬
在我們的負載均衡介面的changeIP方法中,我們可以獲得到本次不可用的ip,在這裡,我們獲取這些IP,並銷燬他們對應的連線池即可。
//釋放對應連線池
ConnectionPool connectionPool=RPCRequestNet.getInstance().connectionPoolMap.get(oldIP);
if (connectionPool!=null) {
connectionPool.destroyChannel();
}
到這裡就完成了連線池化的優化,下一篇章我會繼續講述我對非同步RPC呼叫的優化