dubbo心跳機制 (2)
此文已由作者趙計剛授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
來看一下HeaderExchangeServer.this.getChannels():
1 public Collection<Channel> getChannels() { 2 return (Collection) getExchangeChannels(); 3 } 4 5 public Collection<ExchangeChannel> getExchangeChannels() { 6 Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>(); 7 Collection<Channel> channels = server.getChannels(); 8 if (channels != null && channels.size() > 0) { 9 for (Channel channel : channels) { 10 exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel)); 11 } 12 } 13 return exchangeChannels; 14 }
實際上就是獲取NettyServer中的全部channel連線。
獲取到需要心跳檢測的channel後,對每一個channel進行如下判斷:
如果在heartbeat內沒有進行讀操作或者寫操作,則傳送心跳請求
如果正常訊息和心跳在heartbeatTimeout都沒接收到,consumer端會進行重連,provider端會關閉channel
這裡比較關鍵的是lastRead和lastWrite的設定。先來看一下獲取:
1 Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP); 2 Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
說明有地方在設定這兩個值到channel中。
從請求和響應處理來看,無論是請求還是響應都會按照這個順序處理一遍。
1 MultiMessageHandler 2 -->handler: HeartbeatHandler 3 -->handler: AllChannelHandler 4 -->url: providerUrl 5 -->executor: FixedExecutor 6 -->handler: DecodeHandler 7 -->handler: HeaderExchangeHandler 8 -->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)
其中HeartbeatHandler原始碼如下:
1 public class HeartbeatHandler extends AbstractChannelHandlerDelegate { 2 3 private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class); 4 5 public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP"; 6 7 public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP"; 8 9 public HeartbeatHandler(ChannelHandler handler) { 10 super(handler); 11 } 12 13 public void connected(Channel channel) throws RemotingException { 14 setReadTimestamp(channel); 15 setWriteTimestamp(channel); 16 handler.connected(channel); 17 } 18 19 public void disconnected(Channel channel) throws RemotingException { 20 clearReadTimestamp(channel); 21 clearWriteTimestamp(channel); 22 handler.disconnected(channel); 23 } 24 25 public void sent(Channel channel, Object message) throws RemotingException { 26 setWriteTimestamp(channel); 27 handler.sent(channel, message); 28 } 29 30 public void received(Channel channel, Object message) throws RemotingException { 31 setReadTimestamp(channel); 32 if (isHeartbeatRequest(message)) { 33 Request req = (Request) message; 34 if (req.isTwoWay()) { 35 Response res = new Response(req.getId(), req.getVersion()); 36 res.setEvent(Response.HEARTBEAT_EVENT); 37 channel.send(res); 38 if (logger.isInfoEnabled()) { 39 int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); 40 if (logger.isDebugEnabled()) { 41 logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() 42 + ", cause: The channel has no data-transmission exceeds a heartbeat period" 43 + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); 44 } 45 } 46 } 47 return; 48 } 49 if (isHeartbeatResponse(message)) { 50 if (logger.isDebugEnabled()) { 51 logger.debug( 52 new StringBuilder(32) 53 .append("Receive heartbeat response in thread ") 54 .append(Thread.currentThread().getName()) 55 .toString()); 56 } 57 return; 58 } 59 handler.received(channel, message); 60 } 61 62 private void setReadTimestamp(Channel channel) { 63 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); 64 } 65 66 private void setWriteTimestamp(Channel channel) { 67 channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); 68 } 69 70 private void clearReadTimestamp(Channel channel) { 71 channel.removeAttribute(KEY_READ_TIMESTAMP); 72 } 73 74 private void clearWriteTimestamp(Channel channel) { 75 channel.removeAttribute(KEY_WRITE_TIMESTAMP); 76 } 77 78 private boolean isHeartbeatRequest(Object message) { 79 return message instanceof Request && ((Request) message).isHeartbeat(); 80 } 81 82 private boolean isHeartbeatResponse(Object message) { 83 return message instanceof Response && ((Response) message).isHeartbeat(); 84 } 85 }
連線完成時:設定lastRead和lastWrite
連線斷開時:清空lastRead和lastWrite
傳送訊息時:設定lastWrite
接收訊息時:設定lastRead
之後交由AllChannelHandler進行處理。之後會一直交由HeaderExchangeHandler進行處理。其對lastRead和lastWrite也做了設定和清理:
1 public void connected(Channel channel) throws RemotingException { 2 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); 3 channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); 4 ... 5 } 6 7 public void disconnected(Channel channel) throws RemotingException { 8 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); 9 channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); 10 ... 11 } 12 13 public void sent(Channel channel, Object message) throws RemotingException { 14 Throwable exception = null; 15 try { 16 channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); 17 ... 18 } catch (Throwable t) { 19 exception = t; 20 } 21 } 22 23 public void received(Channel channel, Object message) throws RemotingException { 24 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); 25 ... 26 }
連線完成時:設定lastRead和lastWrite
連線斷開時:也設定lastRead和lastWrite(為什麼?)
傳送訊息時:設定lastWrite
接收訊息時:設定lastRead
這裡裡有個疑問,從handler鏈來看,無論是請求還是響應都會按照handler鏈來處理一遍。那麼在HeartbeatHandler中已經進行了lastWrite和lastRead的設定,為什麼還要在HeaderExchangeHandler中再處理一遍?
最後,provider端認為連線斷了,則會關閉channel。來看一下NettyChannel的close方法:
1 public void close() { 2 // 1 將close屬性設為true 3 try { 4 super.close(); 5 } catch (Exception e) { 6 logger.warn(e.getMessage(), e); 7 } 8 // 2 從全域性NettyChannel快取器中將當前的NettyChannel刪掉 9 try { 10 removeChannelIfDisconnected(channel); 11 } catch (Exception e) { 12 logger.warn(e.getMessage(), e); 13 } 14 // 3 清空當前的NettyChannel中的attributes屬性 15 try { 16 attributes.clear(); 17 } catch (Exception e) { 18 logger.warn(e.getMessage(), e); 19 } 20 // 4 關閉netty的channel,執行netty的channel的優雅關閉 21 try { 22 if (logger.isInfoEnabled()) { 23 logger.info("Close netty channel " + channel); 24 } 25 channel.close(); 26 } catch (Exception e) { 27 logger.warn(e.getMessage(), e); 28 } 29 }
從上邊程式碼來看,假設consumer端掛了,provider端的心跳檢測機制可以進行相關的資源回收,所以provider端的心跳檢測機制是有必要的。
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 微服務監控探索
【推薦】 Android 模擬器下載、編譯及除錯
【推薦】 聊一聊整車廠的那些事——售後配件業務