1. 程式人生 > >dubbo心跳機制 (1)

dubbo心跳機制 (1)

此文已由作者趙計剛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


dubbo的心跳機制:

  • 目的:檢測provider與consumer之間的connection連線是不是還連線著,如果連線斷了,需要作出相應的處理。

  • 原理:

    • provider:dubbo的心跳預設是在heartbeat(預設是60s)內如果沒有接收到訊息,就會發送心跳訊息,如果連著3次(180s)沒有收到心跳響應,provider會關閉channel

    • consumer:dubbo的心跳預設是在60s內如果沒有接收到訊息,就會發送心跳訊息,如果連著3次(180s)沒有收到心跳響應,consumer會進行重連

來看原始碼呼叫鏈。先看provider端。

 

一、provider端心跳機制

複製程式碼

              -->openServer(URL url)
                 url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&default.server=netty4&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider&timestamp=1520660491836
                -->createServer(URL url)
                    -->HeaderExchanger.bind(URL url, ExchangeHandler handler)
                       url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider&timestamp=1520660491836 handler:DubboProtocol.requestHandler
                      -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                        -->NettyTransporter.bind(URL url, ChannelHandler listener)
                           listener:上邊的DecodeHandler例項
                          -->new NettyServer(URL url, ChannelHandler handler)
                            -->ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
                               handler:上邊的DecodeHandler例項
                            -->doOpen()//開啟netty服務
                      -->new HeaderExchangeServer(Server server)
                         server:上述的NettyServer
                        -->startHeatbeatTimer()

複製程式碼

服務端在開啟netty服務時, 在呼叫createServer時,會從url的parameters map中獲取heartbeat配置,程式碼如下:

複製程式碼

 1      private ExchangeServer createServer(URL url) {
 2 
 3         ...
 4 
 5         url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 6        
 7         ...
 8 
 9         ExchangeServer server;
10         try {
11             server = Exchangers.bind(url, requestHandler);
12         } catch (RemotingException e) {
13             throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
14         }
15 
16         ...
17 
18         return server;
19     }

複製程式碼

其中:int DEFAULT_HEARTBEAT = 60 * 1000,即當用戶沒有配置heartbeat(心跳時間)時,預設heartbeat=60s(即60s內沒有接收到任何請求,就會發送心跳資訊)。那麼這個heartbeat到底該怎麼配?

provider端:

1     <dubbo:service ...>
2         <dubbo:parameter key="heartbeat" value="3000"/>
3     </dubbo:service>

consumer端:

1     <dubbo:reference ...>
2         <dubbo:parameter key="heartbeat" value="3000"/>
3     </dubbo:reference>

再來看呼叫鏈,當執行到這一句。

1 ChannelHandler.wrapInternal(ChannelHandler handler, URL url)

會形成一個handler呼叫鏈,呼叫鏈如下:

複製程式碼

1 MultiMessageHandler
2 -->handler: HeartbeatHandler
3    -->handler: AllChannelHandler
4          -->url: providerUrl
5          -->executor: FixedExecutor
6          -->handler: DecodeHandler
7             -->handler: HeaderExchangeHandler
8                -->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)

複製程式碼

這也是netty接收到請求後的處理鏈路,注意其中有一個HeartbeatHandler。

最後,執行new HeaderExchangeServer(Server server),來看原始碼:

複製程式碼

1 public class HeaderExchangeServer implements ExchangeServer {
 2     /** 心跳定時器 */
 3     private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
 4             new NamedThreadFactory(
 5                     "dubbo-remoting-server-heartbeat",
 6                     true));
 7     /** NettyServer */
 8     private final Server server;
 9     // heartbeat timer
10     private ScheduledFuture<?> heatbeatTimer;
11     // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
12     private int heartbeat;
13     private int heartbeatTimeout;
14     private AtomicBoolean closed = new AtomicBoolean(false);
15 
16     public HeaderExchangeServer(Server server) {
17         if (server == null) {
18             throw new IllegalArgumentException("server == null");
19         }
20         this.server = server;
21         this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
22         this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
23         if (heartbeatTimeout < heartbeat * 2) {
24             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
25         }
26         startHeatbeatTimer();
27     }
28 
29     private void startHeatbeatTimer() {
30         stopHeartbeatTimer();
31         if (heartbeat > 0) {
32             heatbeatTimer = scheduled.scheduleWithFixedDelay(
33                     new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
34                         public Collection<Channel> getChannels() {
35                             return Collections.unmodifiableCollection(
36                                     HeaderExchangeServer.this.getChannels());
37                         }
38                     }, heartbeat, heartbeatTimeout),
39                     heartbeat, heartbeat, TimeUnit.MILLISECONDS);
40         }
41     }
42 
43     private void stopHeartbeatTimer() {
44         try {
45             ScheduledFuture<?> timer = heatbeatTimer;
46             if (timer != null && !timer.isCancelled()) {
47                 timer.cancel(true);
48             }
49         } catch (Throwable t) {
50             logger.warn(t.getMessage(), t);
51         } finally {
52             heatbeatTimer = null;
53         }
54     }
55 }

複製程式碼

建立HeaderExchangeServer時,初始化了heartbeat(心跳間隔時間)和heartbeatTimeout(心跳響應超時時間:即如果最終傳送的心跳在這個時間內都沒有返回,則做出響應的處理)。

  • heartbeat預設是0(從startHeatbeatTimer()方法可以看出只有heartbeat>0的情況下,才會發心跳,這裡heartbeat如果從url的parameter map中獲取不到,就是0,但是我們在前邊看到dubbo會預設設定heartbeat=60s到parameter map中,所以此處的heartbeat=60s);

  • heartbeatTimeout:預設是heartbeat*3。(原因:假設一端發出一次heartbeatRequest,另一端在heartbeat內沒有返回任何響應-包括正常請求響應和心跳響應,此時不能認為是連線斷了,因為有可能還是網路抖動什麼的導致了tcp包的重傳超時等)

  • scheduled是一個含有一個執行緒的定時執行緒執行器(其中的執行緒名字為:"dubbo-remoting-server-heartbeat-thread-*")

之後啟動心跳定時任務:

  • 首先如果原來有心跳定時任務,關閉原來的定時任務

  • 之後啟動scheduled中的定時執行緒,從啟動該執行緒開始,每隔heartbeat執行一次HeartBeatTask任務(第一次執行是在啟動執行緒後heartbeat時)

來看一下HeartBeatTask的原始碼:

複製程式碼

 1 final class HeartBeatTask implements Runnable {
 2     // channel獲取器:用於獲取所有需要進行心跳檢測的channel
 3     private ChannelProvider channelProvider;
 4     private int heartbeat;
 5     private int heartbeatTimeout;
 6 
 7     HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
 8         this.channelProvider = provider;
 9         this.heartbeat = heartbeat;
10         this.heartbeatTimeout = heartbeatTimeout;
11     }
12 
13     public void run() {
14         try {
15             long now = System.currentTimeMillis();
16             for (Channel channel : channelProvider.getChannels()) {
17                 if (channel.isClosed()) {
18                     continue;
19                 }
20                 try {
21                     // 獲取最後一次讀操作的時間
22                     Long lastRead = (Long) channel.getAttribute(
23                             HeaderExchangeHandler.KEY_READ_TIMESTAMP);
24                     // 獲取最後一次寫操作的時間
25                     Long lastWrite = (Long) channel.getAttribute(
26                             HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);27                     // 如果在heartbeat內沒有進行讀操作或者寫操作,則傳送心跳請求
28                     if ((lastRead != null && now - lastRead > heartbeat)
29                             || (lastWrite != null && now - lastWrite > heartbeat)) {
30                         Request req = new Request();
31                         req.setVersion("2.0.0");
32                         req.setTwoWay(true);
33                         req.setEvent(Request.HEARTBEAT_EVENT);
34                         channel.send(req);
35                         if (logger.isDebugEnabled()) {
36                             logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
37                                     + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
38                         }
39                     }
40                     //正常訊息和心跳在heartbeatTimeout都沒接收到
41                     if (lastRead != null && now - lastRead > heartbeatTimeout) {
42                         logger.warn("Close channel " + channel
43                                 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
44                         // consumer端進行重連
45                         if (channel instanceof Client) {
46                             try {
47                                 ((Client) channel).reconnect();
48                             } catch (Exception e) {
49                                 //do nothing
50                             }
51                         } else {// provider端關閉連線
52                             channel.close();
53                         }
54                     }
55                 } catch (Throwable t) {
56                     logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
57                 }
58             }
59         } catch (Throwable t) {
60             logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
61         }
62     }
63 
64     interface ChannelProvider {
65         Collection<Channel> getChannels();
66     }
67 }


HeartBeatTask首先獲取所有的channelProvider#getChannels獲取所有需要心跳檢測的channel,channelProvider例項是HeaderExchangeServer中在啟動執行緒定時執行器的時候建立的內部類。

1                     new HeartBeatTask.ChannelProvider() {
2                         public Collection<Channel> getChannels() {
3                             return Collections.unmodifiableCollection(
4                                     HeaderExchangeServer.this.getChannels());
5                         }
6                     }


免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 資料庫路由中介軟體MyCat - 使用篇(6)
【推薦】 HBase最佳實踐 - 叢集規劃
【推薦】 網易物件儲存NOS圖床神器