1. 程式人生 > >曹工雜談:花了兩天時間,寫了一個netty實現的http客戶端,支援同步轉非同步和連線池(1)--核心邏輯講解

曹工雜談:花了兩天時間,寫了一個netty實現的http客戶端,支援同步轉非同步和連線池(1)--核心邏輯講解

# 背景 先說下寫這個的目的,其實是好奇,dubbo是怎麼實現同步轉非同步的,然後瞭解到,其依賴了請求中攜帶的請求id來完成這個連線複用;然後我又發現,redisson這個redis客戶端,底層也是用的netty,那就比較好奇了:netty是非同步的,上層是同步的,要拿結果的,同時呢,redis協議也不可能按照redisson的要求,在請求和響應裡攜帶請求id,那,它是怎麼實現同步轉非同步的呢,非同步結果回來後,又是怎麼把結果對應上的呢? 對redisson debug除錯了long long time之後(你們知道的,多執行緒不好除錯),大概理清了思路,基本就是:連線池 的思路。比如,我要訪問redis: 1. 我會先去連線池裡拿一個連線(其實是一個netty的socketChannel),然後用這個連線,去發起請求。 2. 上層新建一個promise(可寫的future,熟悉completablefuture的可以秒懂,不熟悉的話,可以理解為一個阻塞佇列,你去取東西,取不到,阻塞;生產者往佇列放一個東西,你就不再阻塞了,且拿到了東西),把傳送請求的任務交給下層的netty channel後,_將promise設定為netty channel的一個attribute_,然後在這個promise上阻塞等待 3. 下層的netty channel向redis 伺服器發起請求 4. netty接收到redis 伺服器的響應後,從channel中取到第二步設定的attribute,獲取到promise,此時,相當於拿到了鎖,然後開啟鎖,並把結果設定到promise中 5. 主執行緒被第四步喚醒後,拿到結果並返回。 其實問題的關鍵是,第二步的promise傳遞,要設定為channel的一個attribute,不然的話,響應回來後,也不知道把響應給誰。 理清了redisson的基本思路後,我想到了很早之前,面試oppo,二面的面試官就問了我一個問題:寫過類似代理的中介軟體沒有?(因為當時面試的是中介軟體部門) 然後我說沒有,然後基本就涼了。 其實,中介軟體最主要的要求,尤其是代理這種,一方面接收請求,一方面還得作為客戶端去發起請求,發起請求這一步,很容易變成效能瓶頸,不少實現裡,這一步都是直接使用http client這類同步請求的工具(也是支援非同步的,只是同步更常見),所以我也一直想寫一個netty這種非同步的客戶端,同時還能同步轉非同步的,不能同步轉非同步,應用場景就比較受限了。 #實現思路 原始碼給懶得看文字的同學: 扯了這麼多,我說下我這個http client的思路,和上面那個redisson的差不多,我這邊的場景也是作為一箇中間件,要訪問的後端服務就幾個,比如要訪問http://192.168.19.102:8080下的若干服務,我這邊是啟動時候,就會去建一個連線池(直接配置commons pool2的池化引數,我這裡配置的是,2個連線),連線池好了後,netty 的channel已經是ok的了,如下所示: ![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200319090833124-1995346023.png) 這每一個長連線,是包在我們的一個核心的資料結構裡的,叫NettyClient。 核心的屬性,其實主要下面兩個: ```java //要連線的host和埠 private HostAndPortConfig config; /** * 當前使用的channel */ Channel channel; ``` ## NettyClient的初始化 ### 建構函式 建構函式如下: ```java public NettyClient(HostAndPortConfig config) { this.config = config; } @Data @AllArgsConstructor @NoArgsConstructor public class HostAndPortConfig { private String host; private Integer port; } ``` 夠簡單吧,先不考慮連線池,最開始測試的時候,我就是這樣,直接new物件的。 ```java public static void main(String[] args) { HostAndPortConfig config = new HostAndPortConfig("192.168.19.102", 8080); NettyClient client = new NettyClient(config); client.initConnection(); NettyHttpResponse response = client.doPost("http://192.168.19.102:8080/BOL_WebService/xxxxx.do", JSONObject.toJSONString(new Object())); if (response == null) { return; } System.out.println(response.getBody()); } ``` ### 初始化連線 上面的測試程式碼,new完物件後,開始初始化連線。 ```java public void initConnection() { log.info("initConnection starts..."); Bootstrap bootstrap; //1.建立netty所需的bootstrap配置 bootstrap = createBootstrap(config); //2.發起連線 ChannelFuture future = bootstrap.connect(config.getHost(), config.getPort()); log.info("current thread:{}", Thread.currentThread().getName()); //3.等待連線成功 boolean ret = future.awaitUninterruptibly(2000, MILLISECONDS); boolean bIsSuccess = ret && future.isSuccess(); if (!bIsSuccess) { //4.不成功拋異常 bIsConnectionOk = false; log.error("host config:{}",config); throw new RuntimeException("連線失敗"); } //5.走到這裡,說明成功了,新的channle賦值給field cleanOldChannelAndCancelReconnect(future, channel); bIsConnectionOk = true; } ``` 這裡初始化連線是直接同步等待的,如果失敗,直接拋異常。第5步裡,主要是把新的channel賦值給當前物件的一個field,同時,關閉舊的channle之類的。 ```java private void cleanOldChannelAndCancelReconnect(ChannelFuture future, Channel oldChannel) { /** * 連線成功,關閉舊的channel,再用新的channel賦值給field */ try { if (oldChannel != null) { try { log.info("Close old netty channel " + oldChannel); oldChannel.close(); } catch (Exception e) { log.error("e:{}", e); } } } finally { /** * 新channel覆蓋field */ NettyClient.this.channel = future.channel(); NettyClient.this.bIsConnectionOk = true; log.info("connection is ok,new channel:{}", NettyClient.this.channel); if (NettyClient.this.scheduledFuture != null) { log.info("cancel scheduledFuture"); NettyClient.this.scheduledFuture.cancel(true); } } } ``` ### netty client中,涉及的出站handler 這裡說下前面的bootstrap的構造,如下: ```java private Bootstrap createBootstrap(HostAndPortConfig config) { Bootstrap bootstrap = new Bootstrap() .channel(NioSocketChannel.class) .group(NIO_EVENT_LOOP_GROUP); bootstrap.handler(new CustomChannelInitializer(bootstrap, config, this)); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); return bootstrap; } ``` handler 鏈,主要在CustomChannelInitializer類中。 ```java protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // http客戶端編解碼器,包括了客戶端http請求編碼,http響應的解碼 pipeline.addLast(new HttpClientCodec()); // 把多個HTTP請求中的資料組裝成一個 pipeline.addLast(new HttpObjectAggregator(65536)); // 用於處理大資料流 pipeline.addLast(new ChunkedWriteHandler()); /** * 重連handler */ pipeline.addLast(new ReconnectHandler(nettyClient)); /** * 傳送業務資料前,進行json編碼 */ pipeline.addLast(new HttpJsonRequestEncoder()); pipeline.addLast(new HttpResponseHandler()); } ``` 其中,出站時(即客戶端向外部write時),涉及的handler如下: 1. HttpJsonRequestEncoder,把業務物件,轉變為httpRequest 2. HttpClientCodec,把第一步傳給我們的httpRequest,編碼為bytebuf,交給channel傳送 簡單說下HttpJsonRequestEncoder,這個是我自定義的: ```java /** * http請求傳送前,使用該編碼器進行編碼 * * 本來是打算在這裡編碼body為json,感覺沒必要,直接上移到工具類了 */ public class HttpJsonRequestEncoder extends MessageToMessageEncoder { final static String CHARSET_NAME = "UTF-8"; final static Charset UTF_8 = Charset.forName(CHARSET_NAME); @Override protected void encode(ChannelHandlerContext ctx, NettyHttpRequest nettyHttpRequest, List out) { // 1. 這個就是要最終傳遞出去的物件 FullHttpRequest request = null; if (nettyHttpRequest.getHttpMethod() == HttpMethod.POST) { ByteBuf encodeBuf = Unpooled.copiedBuffer((CharSequence) nettyHttpRequest.getBody(), UTF_8); request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, nettyHttpRequest.getUri(), encodeBuf); HttpUtil.setContentLength(request, encodeBuf.readableBytes()); } else if (nettyHttpRequest.getHttpMethod() == HttpMethod.GET) { request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, nettyHttpRequest.getUri()); } else { throw new RuntimeException(); } //2. 填充header populateHeaders(ctx, request); out.add(request); } private void populateHeaders(ChannelHandlerContext ctx, FullHttpRequest request) { /** * headers 設定 */ HttpHeaders headers = request.headers(); headers.set(HttpHeaderNames.HOST, ctx.channel().remoteAddress().toString().substring(1)); headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); headers.set(HttpHeaderNames.CONTENT_TYPE, "application/json"); /** * 設定我方可以接收的 */ headers.set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP.toString() + ',' + HttpHeaderValues.DEFLATE.toString()); headers.set(HttpHeaderNames.ACCEPT_CHARSET, "utf-8,ISO-8859-1;q=0.7,*;q=0.7"); headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"); headers.set(HttpHeaderNames.ACCEPT, "*/*"); /** * 設定agent */ headers.set(HttpHeaderNames.USER_AGENT, "Netty xml Http Client side"); } } ``` ### netty client涉及的入站handler 1. HttpClientCodec和HttpObjectAggregator,主要是將bytebuf,轉變為io.netty.handler.codec.http.FullHttpResponse 型別的物件 2. HttpResponseHandler,我們的業務handler ```java /** * http請求響應的處理器 */ @Slf4j public class HttpResponseHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception { String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8); NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s); // 1. NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get(); log.info("req url:{},params:{},resp:{}", nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(), nettyHttpRequestContext.getNettyHttpRequest().getBody(), nettyHttpResponse); // 2. Promise promise = nettyHttpRequestContext.getDefaultPromise(); promise.setSuccess(nettyHttpResponse); } } ``` 1. 1處程式碼,主要從channel中,根據key,獲取當前的請求相關資訊 2. 2處程式碼,從當前請求中,拿到promise,設定結果,此時,會喚醒主執行緒。 ## netty client 發起http post呼叫 說完了netty client,我們再說說呼叫的過程: ```java public NettyHttpResponse doPost(String url, Object body) { NettyHttpRequest request = new NettyHttpRequest(url, body); return doHttpRequest(request); } private static final DefaultEventLoop NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP = new DefaultEventLoop(null, new NamedThreadFactory("NettyResponsePromiseNotify")); private NettyHttpResponse doHttpRequest(NettyHttpRequest request) { // 1 Promise defaultPromise = NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP.newPromise(); // 2 NettyHttpRequestContext context = new NettyHttpRequestContext(request, defaultPromise); channel.attr(CURRENT_REQ_BOUND_WITH_THE_CHANNEL).set(context); // 3 ChannelFuture channelFuture = channel.writeAndFlush(request); channelFuture.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { System.out.println(Thread.currentThread().getName() + " 請求傳送完成"); } }); // 4 return get(defaultPromise); } ``` 上面我已經標註了幾個數字,分別講一下: 1. 新建一個promise,可以理解為一把可以我們手動完成的鎖(一般主執行緒在這個鎖上等待,在另一個執行緒去完成) 2. 把鎖和其他請求資訊,一起放到channle裡 3. 使用channle傳送資料 4. 同步等待 第四步等待的get方法如下: ```java public V get(Promise future) { // 1. if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // 2 // promise的執行緒池,會回撥該listener l.countDown(); } } }); boolean interrupted = false; if (!future.isDone()) { try { // 3 l.await(4, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("e:{}", e); interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } //4 if (future.isSuccess()) { return future.getNow(); } log.error("wait result time out "); return null; } ``` 1. 如果promise的狀態還是沒有完成,則我們new了一個閉鎖 2. 加了一個listner在promise上面,別人操作這個promise,這個listener會被回撥,回撥邏輯:將閉鎖開啟 3. 主執行緒,在閉鎖上等待 4. 主執行緒,走到這裡,說明已經等待超時,或者已經完成,可以獲取結果並返回 ### 什麼地方會修改promise 前面我們提到了,在response的handler中: ```java /** * http請求響應的處理器 */ @Slf4j public class HttpResponseHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception { String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8); NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s); // 1. NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get(); log.info("req url:{},params:{},resp:{}", nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(), nettyHttpRequestContext.getNettyHttpRequest().getBody(), nettyHttpResponse); // 2. Promise promise = nettyHttpRequestContext.getDefaultPromise(); promise.setSuccess(nettyHttpResponse); } } ``` 其中,2處,修改promise,此時就會回撥前面說的那個listenr,開啟閉鎖,主執行緒也因此得以繼續執行: ```java public V get(Promise future) { if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // io執行緒會回撥該listener l.countDown(); } } }); ..... } ``` # 總結 本篇的大致思路差不多就是這樣了,主要邏輯在於同步轉非同步那一塊。 還有些沒講到的,後面再講,大概還有2個部分。 1. 斷線重連 2. commons pool實現連線池。 程式碼我放在: