1. 程式人生 > >dubbo心跳機制 (3)

dubbo心跳機制 (3)

此文已由作者趙計剛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


二、consumer端心跳機制


                      //建立ExchangeClient,對第一次服務發現providers路徑下的相關url建立長連線
                      -->getClients(URL url)
                        -->getSharedClient(URL url)
                          -->ExchangeClient exchangeClient = initClient(url)
                            -->Exchangers.connect(url, requestHandler)
                              -->HeaderExchanger.connect(URL url, ExchangeHandler handler)
                                -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                                  -->Transporters.connect(URL url, ChannelHandler... handlers)
                                    -->NettyTransporter.connect(URL url, ChannelHandler listener)
                                      -->new NettyClient(url, listener)
                                        -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
                                        -->getChannelCodec(url)//獲取Codec2,這裡是DubboCountCodec例項
                                        -->doOpen()//開啟netty客戶端
                                        -->doConnect()//連線服務端,建立長連線
                                -->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient例項,needHeartbeat:true
                                  -->startHeatbeatTimer()//啟動心跳計數器


客戶端在initClient(url)中設定了heartbeat引數(預設為60s,使用者自己設定的方式見“一”中所講),如下:


 1     /**
 2      * Create new connection
 3      */
 4     private ExchangeClient initClient(URL url) {
 5         ...
 6         // enable heartbeat by default
 7         url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 8 
 9         ...
10 
11         ExchangeClient client;
12         try {
13             // connection should be lazy
14             if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
15                 client = new LazyConnectExchangeClient(url, requestHandler);16             } else {
17                 client = Exchangers.connect(url, requestHandler);
18             }
19         } catch (RemotingException e) {
20             throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
21         }
22         return client;
23     }


與provider類似,來看一下最後開啟心跳檢測的地方。


 1 public class HeaderExchangeClient implements ExchangeClient {
 2     private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
 3     private final Client client;
 4     private final ExchangeChannel channel;
 5     // heartbeat timer
 6     private ScheduledFuture<?> heartbeatTimer;
 7     // heartbeat(ms), default value is 0 , won't execute a heartbeat.
 8     private int heartbeat;
 9     private int heartbeatTimeout;
10 
11     public HeaderExchangeClient(Client client, boolean needHeartbeat) {
12         if (client == null) {
13             throw new IllegalArgumentException("client == null");
14         }
15         this.client = client;
16         this.channel = new HeaderExchangeChannel(client);
17         String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
18         this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
19         this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
20         if (heartbeatTimeout < heartbeat * 2) {
21             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
22         }
23         if (needHeartbeat) {
24             startHeatbeatTimer();
25         }
26     }
27 
28     private void startHeatbeatTimer() {
29         stopHeartbeatTimer();
30         if (heartbeat > 0) {
31             heartbeatTimer = scheduled.scheduleWithFixedDelay(
32                     new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
33                         public Collection<Channel> getChannels() {
34                             return Collections.<Channel>singletonList(HeaderExchangeClient.this);
35                         }
36                     }, heartbeat, heartbeatTimeout),
37                     heartbeat, heartbeat, TimeUnit.MILLISECONDS);
38         }
39     }
40 
41     private void stopHeartbeatTimer() {
42         if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
43             try {
44                 heartbeatTimer.cancel(true);
45                 scheduled.purge();
46             } catch (Throwable e) {
47                 if (logger.isWarnEnabled()) {
48                     logger.warn(e.getMessage(), e);
49                 }
50             }
51         }
52         heartbeatTimer = null;
53     }
54 }


主要看一下startHeartbeatTimer()方法,與provider相同,只是provider是獲取NettyServer的所有的NettyChannel,而consumer只是獲取當前的物件。

consumer的handler處理鏈與provider完全相同。

最後來看一下consumer的重連機制:AbstractClient#reconnect


  1     public void reconnect() throws RemotingException {
 2         disconnect(); 3         connect();
 4     }
 5 
 6     public void disconnect() {
 7         connectLock.lock();
 8         try {
 9             destroyConnectStatusCheckCommand();
10             try {
11                 Channel channel = getChannel();
12                 if (channel != null) {
13                     channel.close();
14                 }
15             } catch (Throwable e) {
16                 logger.warn(e.getMessage(), e);
17             }
18             try {
19                 doDisConnect();
20             } catch (Throwable e) {
21                 logger.warn(e.getMessage(), e);
22             }
23         } finally {
24             connectLock.unlock();
25         }
26     }
27 
28     protected void connect() throws RemotingException {
29         connectLock.lock();
30         try {
31             if (isConnected()) {
32                 return;
33             }
34             initConnectStatusCheckCommand();
35             doConnect();
36             if (!isConnected()) {
37                 throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
38                         + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
39                         + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
40             } else {
41                 if (logger.isInfoEnabled()) {
42                     logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
43                             + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
44                             + ", channel is " + this.getChannel());
45                 }
46             }
47             reconnect_count.set(0);
48             reconnect_error_log_flag.set(false);
49         } catch (RemotingException e) {
50             throw e;
51         } catch (Throwable e) {
52             throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
53                     + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
54                     + ", cause: " + e.getMessage(), e);
55         } finally {
56             connectLock.unlock();
57         }
58     }


程式碼比較簡單,先斷連,再連線。

 

免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 BRVAH(讓RecyclerView變得更高效)(1)