曹工雜談:花了兩天時間,寫了一個netty實現的http客戶端,支援同步轉非同步和連線池(1)--核心邏輯講解
阿新 • • 發佈:2020-03-19
# 背景
先說下寫這個的目的,其實是好奇,dubbo是怎麼實現同步轉非同步的,然後瞭解到,其依賴了請求中攜帶的請求id來完成這個連線複用;然後我又發現,redisson這個redis客戶端,底層也是用的netty,那就比較好奇了:netty是非同步的,上層是同步的,要拿結果的,同時呢,redis協議也不可能按照redisson的要求,在請求和響應裡攜帶請求id,那,它是怎麼實現同步轉非同步的呢,非同步結果回來後,又是怎麼把結果對應上的呢?
對redisson debug除錯了long long time之後(你們知道的,多執行緒不好除錯),大概理清了思路,基本就是:連線池 的思路。比如,我要訪問redis:
1. 我會先去連線池裡拿一個連線(其實是一個netty的socketChannel),然後用這個連線,去發起請求。
2. 上層新建一個promise(可寫的future,熟悉completablefuture的可以秒懂,不熟悉的話,可以理解為一個阻塞佇列,你去取東西,取不到,阻塞;生產者往佇列放一個東西,你就不再阻塞了,且拿到了東西),把傳送請求的任務交給下層的netty channel後,_將promise設定為netty channel的一個attribute_,然後在這個promise上阻塞等待
3. 下層的netty channel向redis 伺服器發起請求
4. netty接收到redis 伺服器的響應後,從channel中取到第二步設定的attribute,獲取到promise,此時,相當於拿到了鎖,然後開啟鎖,並把結果設定到promise中
5. 主執行緒被第四步喚醒後,拿到結果並返回。
其實問題的關鍵是,第二步的promise傳遞,要設定為channel的一個attribute,不然的話,響應回來後,也不知道把響應給誰。
理清了redisson的基本思路後,我想到了很早之前,面試oppo,二面的面試官就問了我一個問題:寫過類似代理的中介軟體沒有?(因為當時面試的是中介軟體部門)
然後我說沒有,然後基本就涼了。
其實,中介軟體最主要的要求,尤其是代理這種,一方面接收請求,一方面還得作為客戶端去發起請求,發起請求這一步,很容易變成效能瓶頸,不少實現裡,這一步都是直接使用http client這類同步請求的工具(也是支援非同步的,只是同步更常見),所以我也一直想寫一個netty這種非同步的客戶端,同時還能同步轉非同步的,不能同步轉非同步,應用場景就比較受限了。
#實現思路
原始碼給懶得看文字的同學:
扯了這麼多,我說下我這個http client的思路,和上面那個redisson的差不多,我這邊的場景也是作為一箇中間件,要訪問的後端服務就幾個,比如要訪問http://192.168.19.102:8080下的若干服務,我這邊是啟動時候,就會去建一個連線池(直接配置commons pool2的池化引數,我這裡配置的是,2個連線),連線池好了後,netty 的channel已經是ok的了,如下所示:
![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200319090833124-1995346023.png)
這每一個長連線,是包在我們的一個核心的資料結構裡的,叫NettyClient。
核心的屬性,其實主要下面兩個:
```java
//要連線的host和埠
private HostAndPortConfig config;
/**
* 當前使用的channel
*/
Channel channel;
```
## NettyClient的初始化
### 建構函式
建構函式如下:
```java
public NettyClient(HostAndPortConfig config) {
this.config = config;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HostAndPortConfig {
private String host;
private Integer port;
}
```
夠簡單吧,先不考慮連線池,最開始測試的時候,我就是這樣,直接new物件的。
```java
public static void main(String[] args) {
HostAndPortConfig config = new HostAndPortConfig("192.168.19.102", 8080);
NettyClient client = new NettyClient(config);
client.initConnection();
NettyHttpResponse response = client.doPost("http://192.168.19.102:8080/BOL_WebService/xxxxx.do",
JSONObject.toJSONString(new Object()));
if (response == null) {
return;
}
System.out.println(response.getBody());
}
```
### 初始化連線
上面的測試程式碼,new完物件後,開始初始化連線。
```java
public void initConnection() {
log.info("initConnection starts...");
Bootstrap bootstrap;
//1.建立netty所需的bootstrap配置
bootstrap = createBootstrap(config);
//2.發起連線
ChannelFuture future = bootstrap.connect(config.getHost(), config.getPort());
log.info("current thread:{}", Thread.currentThread().getName());
//3.等待連線成功
boolean ret = future.awaitUninterruptibly(2000, MILLISECONDS);
boolean bIsSuccess = ret && future.isSuccess();
if (!bIsSuccess) {
//4.不成功拋異常
bIsConnectionOk = false;
log.error("host config:{}",config);
throw new RuntimeException("連線失敗");
}
//5.走到這裡,說明成功了,新的channle賦值給field
cleanOldChannelAndCancelReconnect(future, channel);
bIsConnectionOk = true;
}
```
這裡初始化連線是直接同步等待的,如果失敗,直接拋異常。第5步裡,主要是把新的channel賦值給當前物件的一個field,同時,關閉舊的channle之類的。
```java
private void cleanOldChannelAndCancelReconnect(ChannelFuture future, Channel oldChannel) {
/**
* 連線成功,關閉舊的channel,再用新的channel賦值給field
*/
try {
if (oldChannel != null) {
try {
log.info("Close old netty channel " + oldChannel);
oldChannel.close();
} catch (Exception e) {
log.error("e:{}", e);
}
}
} finally {
/**
* 新channel覆蓋field
*/
NettyClient.this.channel = future.channel();
NettyClient.this.bIsConnectionOk = true;
log.info("connection is ok,new channel:{}", NettyClient.this.channel);
if (NettyClient.this.scheduledFuture != null) {
log.info("cancel scheduledFuture");
NettyClient.this.scheduledFuture.cancel(true);
}
}
}
```
### netty client中,涉及的出站handler
這裡說下前面的bootstrap的構造,如下:
```java
private Bootstrap createBootstrap(HostAndPortConfig config) {
Bootstrap bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.group(NIO_EVENT_LOOP_GROUP);
bootstrap.handler(new CustomChannelInitializer(bootstrap, config, this));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
```
handler 鏈,主要在CustomChannelInitializer類中。
```java
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// http客戶端編解碼器,包括了客戶端http請求編碼,http響應的解碼
pipeline.addLast(new HttpClientCodec());
// 把多個HTTP請求中的資料組裝成一個
pipeline.addLast(new HttpObjectAggregator(65536));
// 用於處理大資料流
pipeline.addLast(new ChunkedWriteHandler());
/**
* 重連handler
*/
pipeline.addLast(new ReconnectHandler(nettyClient));
/**
* 傳送業務資料前,進行json編碼
*/
pipeline.addLast(new HttpJsonRequestEncoder());
pipeline.addLast(new HttpResponseHandler());
}
```
其中,出站時(即客戶端向外部write時),涉及的handler如下:
1. HttpJsonRequestEncoder,把業務物件,轉變為httpRequest
2. HttpClientCodec,把第一步傳給我們的httpRequest,編碼為bytebuf,交給channel傳送
簡單說下HttpJsonRequestEncoder,這個是我自定義的:
```java
/**
* http請求傳送前,使用該編碼器進行編碼
*
* 本來是打算在這裡編碼body為json,感覺沒必要,直接上移到工具類了
*/
public class HttpJsonRequestEncoder extends
MessageToMessageEncoder {
final static String CHARSET_NAME = "UTF-8";
final static Charset UTF_8 = Charset.forName(CHARSET_NAME);
@Override
protected void encode(ChannelHandlerContext ctx, NettyHttpRequest nettyHttpRequest,
List