1. 程式人生 > >dubbo心跳機制 (2)

dubbo心跳機制 (2)

此文已由作者趙計剛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。



來看一下HeaderExchangeServer.this.getChannels():

  1     public Collection<Channel> getChannels() {
 2         return (Collection) getExchangeChannels();
 3     }
 4 
 5     public Collection<ExchangeChannel> getExchangeChannels() {
 6         Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
 7         Collection<Channel> channels = server.getChannels();
 8         if (channels != null && channels.size() > 0) {
 9             for (Channel channel : channels) {
10                 exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
11             }
12         }
13         return exchangeChannels;
14     }

實際上就是獲取NettyServer中的全部channel連線。

 

獲取到需要心跳檢測的channel後,對每一個channel進行如下判斷:

  • 如果在heartbeat內沒有進行讀操作或者寫操作,則傳送心跳請求

  • 如果正常訊息和心跳在heartbeatTimeout都沒接收到,consumer端會進行重連,provider端會關閉channel

這裡比較關鍵的是lastRead和lastWrite的設定。先來看一下獲取:

1 Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
2 Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);

說明有地方在設定這兩個值到channel中。

從請求和響應處理來看,無論是請求還是響應都會按照這個順序處理一遍。

1 MultiMessageHandler
2 -->handler: HeartbeatHandler
3    -->handler: AllChannelHandler
4          -->url: providerUrl
5          -->executor: FixedExecutor
6          -->handler: DecodeHandler
7             -->handler: HeaderExchangeHandler
8                -->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)

其中HeartbeatHandler原始碼如下:

1 public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
 2 
 3     private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
 4 
 5     public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
 6 
 7     public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
 8 
 9     public HeartbeatHandler(ChannelHandler handler) {
10         super(handler);
11     }
12 
13     public void connected(Channel channel) throws RemotingException {
14         setReadTimestamp(channel);
15         setWriteTimestamp(channel);
16         handler.connected(channel);
17     }
18 
19     public void disconnected(Channel channel) throws RemotingException {
20         clearReadTimestamp(channel);
21         clearWriteTimestamp(channel);
22         handler.disconnected(channel);
23     }
24 
25     public void sent(Channel channel, Object message) throws RemotingException {
26         setWriteTimestamp(channel);
27         handler.sent(channel, message);
28     }
29 
30     public void received(Channel channel, Object message) throws RemotingException {
31         setReadTimestamp(channel);
32         if (isHeartbeatRequest(message)) {
33             Request req = (Request) message;
34             if (req.isTwoWay()) {
35                 Response res = new Response(req.getId(), req.getVersion());
36                 res.setEvent(Response.HEARTBEAT_EVENT);
37                 channel.send(res);
38                 if (logger.isInfoEnabled()) {
39                     int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
40                     if (logger.isDebugEnabled()) {
41                         logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
42                                 + ", cause: The channel has no data-transmission exceeds a heartbeat period"
43                                 + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
44                     }
45                 }
46             }
47             return;
48         }
49         if (isHeartbeatResponse(message)) {
50             if (logger.isDebugEnabled()) {
51                 logger.debug(
52                         new StringBuilder(32)
53                                 .append("Receive heartbeat response in thread ")
54                                 .append(Thread.currentThread().getName())
55                                 .toString());
56             }
57             return;
58         }
59         handler.received(channel, message);
60     }
61 
62     private void setReadTimestamp(Channel channel) {
63         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
64     }
65 
66     private void setWriteTimestamp(Channel channel) {
67         channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
68     }
69 
70     private void clearReadTimestamp(Channel channel) {
71         channel.removeAttribute(KEY_READ_TIMESTAMP);
72     }
73 
74     private void clearWriteTimestamp(Channel channel) {
75         channel.removeAttribute(KEY_WRITE_TIMESTAMP);
76     }
77 
78     private boolean isHeartbeatRequest(Object message) {
79         return message instanceof Request && ((Request) message).isHeartbeat();
80     }
81 
82     private boolean isHeartbeatResponse(Object message) {
83         return message instanceof Response && ((Response) message).isHeartbeat();
84     }
85 }
  • 連線完成時:設定lastRead和lastWrite

  • 連線斷開時:清空lastRead和lastWrite

  • 傳送訊息時:設定lastWrite

  • 接收訊息時:設定lastRead

之後交由AllChannelHandler進行處理。之後會一直交由HeaderExchangeHandler進行處理。其對lastRead和lastWrite也做了設定和清理:

  1     public void connected(Channel channel) throws RemotingException {
 2         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
 3         channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); 4         ...
 5     }
 6 
 7     public void disconnected(Channel channel) throws RemotingException {
 8         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
 9         channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
10         ...
11     }
12 
13     public void sent(Channel channel, Object message) throws RemotingException {
14         Throwable exception = null;
15         try {
16             channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
17             ...
18         } catch (Throwable t) {
19             exception = t;
20         }
21     }
22 
23     public void received(Channel channel, Object message) throws RemotingException {
24         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
25         ...
26     }
  • 連線完成時:設定lastRead和lastWrite

  • 連線斷開時:也設定lastRead和lastWrite(為什麼?)

  • 傳送訊息時:設定lastWrite

  • 接收訊息時:設定lastRead

 這裡裡有個疑問,從handler鏈來看,無論是請求還是響應都會按照handler鏈來處理一遍。那麼在HeartbeatHandler中已經進行了lastWrite和lastRead的設定,為什麼還要在HeaderExchangeHandler中再處理一遍?

最後,provider端認為連線斷了,則會關閉channel。來看一下NettyChannel的close方法:

 1     public void close() {
 2         // 1 將close屬性設為true
 3         try {
 4             super.close();
 5         } catch (Exception e) {
 6             logger.warn(e.getMessage(), e);
 7         }
 8         // 2 從全域性NettyChannel快取器中將當前的NettyChannel刪掉
 9         try {
10             removeChannelIfDisconnected(channel);
11         } catch (Exception e) {
12             logger.warn(e.getMessage(), e);
13         }
14         // 3 清空當前的NettyChannel中的attributes屬性
15         try {
16             attributes.clear();
17         } catch (Exception e) {
18             logger.warn(e.getMessage(), e);
19         }
20         // 4 關閉netty的channel,執行netty的channel的優雅關閉
21         try {
22             if (logger.isInfoEnabled()) {
23                 logger.info("Close netty channel " + channel);
24             }
25             channel.close();
26         } catch (Exception e) {
27             logger.warn(e.getMessage(), e);
28         }
29     }

從上邊程式碼來看,假設consumer端掛了,provider端的心跳檢測機制可以進行相關的資源回收,所以provider端的心跳檢測機制是有必要的。

 


 

免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 微服務監控探索
【推薦】 Android 模擬器下載、編譯及除錯
【推薦】 聊一聊整車廠的那些事——售後配件業務