dubbo心跳機制 (1)
此文已由作者趙計剛授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
dubbo的心跳機制:
目的:檢測provider與consumer之間的connection連線是不是還連線著,如果連線斷了,需要作出相應的處理。
原理:
provider:dubbo的心跳預設是在heartbeat(預設是60s)內如果沒有接收到訊息,就會發送心跳訊息,如果連著3次(180s)沒有收到心跳響應,provider會關閉channel。
consumer:dubbo的心跳預設是在60s內如果沒有接收到訊息,就會發送心跳訊息,如果連著3次(180s)沒有收到心跳響應,consumer會進行重連
來看原始碼呼叫鏈。先看provider端。
一、provider端心跳機制
-->openServer(URL url) url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&default.server=netty4&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider×tamp=1520660491836 -->createServer(URL url) -->HeaderExchanger.bind(URL url, ExchangeHandler handler) url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider×tamp=1520660491836 handler:DubboProtocol.requestHandler -->new DecodeHandler(new HeaderExchangeHandler(handler))) -->NettyTransporter.bind(URL url, ChannelHandler listener) listener:上邊的DecodeHandler例項 -->new NettyServer(URL url, ChannelHandler handler) -->ChannelHandler.wrapInternal(ChannelHandler handler, URL url) handler:上邊的DecodeHandler例項 -->doOpen()//開啟netty服務 -->new HeaderExchangeServer(Server server) server:上述的NettyServer -->startHeatbeatTimer()
服務端在開啟netty服務時, 在呼叫createServer時,會從url的parameters map中獲取heartbeat配置,程式碼如下:
1 private ExchangeServer createServer(URL url) { 2 3 ... 4 5 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 6 7 ... 8 9 ExchangeServer server; 10 try { 11 server = Exchangers.bind(url, requestHandler); 12 } catch (RemotingException e) { 13 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); 14 } 15 16 ... 17 18 return server; 19 }
其中:int DEFAULT_HEARTBEAT = 60 * 1000,即當用戶沒有配置heartbeat(心跳時間)時,預設heartbeat=60s(即60s內沒有接收到任何請求,就會發送心跳資訊)。那麼這個heartbeat到底該怎麼配?
provider端:
1 <dubbo:service ...> 2 <dubbo:parameter key="heartbeat" value="3000"/> 3 </dubbo:service>
consumer端:
1 <dubbo:reference ...> 2 <dubbo:parameter key="heartbeat" value="3000"/> 3 </dubbo:reference>
再來看呼叫鏈,當執行到這一句。
1 ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
會形成一個handler呼叫鏈,呼叫鏈如下:
1 MultiMessageHandler 2 -->handler: HeartbeatHandler 3 -->handler: AllChannelHandler 4 -->url: providerUrl 5 -->executor: FixedExecutor 6 -->handler: DecodeHandler 7 -->handler: HeaderExchangeHandler 8 -->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)
這也是netty接收到請求後的處理鏈路,注意其中有一個HeartbeatHandler。
最後,執行new HeaderExchangeServer(Server server),來看原始碼:
1 public class HeaderExchangeServer implements ExchangeServer { 2 /** 心跳定時器 */ 3 private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, 4 new NamedThreadFactory( 5 "dubbo-remoting-server-heartbeat", 6 true)); 7 /** NettyServer */ 8 private final Server server; 9 // heartbeat timer 10 private ScheduledFuture<?> heatbeatTimer; 11 // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat. 12 private int heartbeat; 13 private int heartbeatTimeout; 14 private AtomicBoolean closed = new AtomicBoolean(false); 15 16 public HeaderExchangeServer(Server server) { 17 if (server == null) { 18 throw new IllegalArgumentException("server == null"); 19 } 20 this.server = server; 21 this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); 22 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); 23 if (heartbeatTimeout < heartbeat * 2) { 24 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); 25 } 26 startHeatbeatTimer(); 27 } 28 29 private void startHeatbeatTimer() { 30 stopHeartbeatTimer(); 31 if (heartbeat > 0) { 32 heatbeatTimer = scheduled.scheduleWithFixedDelay( 33 new HeartBeatTask(new HeartBeatTask.ChannelProvider() { 34 public Collection<Channel> getChannels() { 35 return Collections.unmodifiableCollection( 36 HeaderExchangeServer.this.getChannels()); 37 } 38 }, heartbeat, heartbeatTimeout), 39 heartbeat, heartbeat, TimeUnit.MILLISECONDS); 40 } 41 } 42 43 private void stopHeartbeatTimer() { 44 try { 45 ScheduledFuture<?> timer = heatbeatTimer; 46 if (timer != null && !timer.isCancelled()) { 47 timer.cancel(true); 48 } 49 } catch (Throwable t) { 50 logger.warn(t.getMessage(), t); 51 } finally { 52 heatbeatTimer = null; 53 } 54 } 55 }
建立HeaderExchangeServer時,初始化了heartbeat(心跳間隔時間)和heartbeatTimeout(心跳響應超時時間:即如果最終傳送的心跳在這個時間內都沒有返回,則做出響應的處理)。
heartbeat預設是0(從startHeatbeatTimer()方法可以看出只有heartbeat>0的情況下,才會發心跳,這裡heartbeat如果從url的parameter map中獲取不到,就是0,但是我們在前邊看到dubbo會預設設定heartbeat=60s到parameter map中,所以此處的heartbeat=60s);
heartbeatTimeout:預設是heartbeat*3。(原因:假設一端發出一次heartbeatRequest,另一端在heartbeat內沒有返回任何響應-包括正常請求響應和心跳響應,此時不能認為是連線斷了,因為有可能還是網路抖動什麼的導致了tcp包的重傳超時等)
scheduled是一個含有一個執行緒的定時執行緒執行器(其中的執行緒名字為:"dubbo-remoting-server-heartbeat-thread-*")
之後啟動心跳定時任務:
首先如果原來有心跳定時任務,關閉原來的定時任務
之後啟動scheduled中的定時執行緒,從啟動該執行緒開始,每隔heartbeat執行一次HeartBeatTask任務(第一次執行是在啟動執行緒後heartbeat時)
來看一下HeartBeatTask的原始碼:
1 final class HeartBeatTask implements Runnable { 2 // channel獲取器:用於獲取所有需要進行心跳檢測的channel 3 private ChannelProvider channelProvider; 4 private int heartbeat; 5 private int heartbeatTimeout; 6 7 HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) { 8 this.channelProvider = provider; 9 this.heartbeat = heartbeat; 10 this.heartbeatTimeout = heartbeatTimeout; 11 } 12 13 public void run() { 14 try { 15 long now = System.currentTimeMillis(); 16 for (Channel channel : channelProvider.getChannels()) { 17 if (channel.isClosed()) { 18 continue; 19 } 20 try { 21 // 獲取最後一次讀操作的時間 22 Long lastRead = (Long) channel.getAttribute( 23 HeaderExchangeHandler.KEY_READ_TIMESTAMP); 24 // 獲取最後一次寫操作的時間 25 Long lastWrite = (Long) channel.getAttribute( 26 HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);27 // 如果在heartbeat內沒有進行讀操作或者寫操作,則傳送心跳請求 28 if ((lastRead != null && now - lastRead > heartbeat) 29 || (lastWrite != null && now - lastWrite > heartbeat)) { 30 Request req = new Request(); 31 req.setVersion("2.0.0"); 32 req.setTwoWay(true); 33 req.setEvent(Request.HEARTBEAT_EVENT); 34 channel.send(req); 35 if (logger.isDebugEnabled()) { 36 logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() 37 + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); 38 } 39 } 40 //正常訊息和心跳在heartbeatTimeout都沒接收到 41 if (lastRead != null && now - lastRead > heartbeatTimeout) { 42 logger.warn("Close channel " + channel 43 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); 44 // consumer端進行重連 45 if (channel instanceof Client) { 46 try { 47 ((Client) channel).reconnect(); 48 } catch (Exception e) { 49 //do nothing 50 } 51 } else {// provider端關閉連線 52 channel.close(); 53 } 54 } 55 } catch (Throwable t) { 56 logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); 57 } 58 } 59 } catch (Throwable t) { 60 logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); 61 } 62 } 63 64 interface ChannelProvider { 65 Collection<Channel> getChannels(); 66 } 67 }
HeartBeatTask首先獲取所有的channelProvider#getChannels獲取所有需要心跳檢測的channel,channelProvider例項是HeaderExchangeServer中在啟動執行緒定時執行器的時候建立的內部類。
1 new HeartBeatTask.ChannelProvider() { 2 public Collection<Channel> getChannels() { 3 return Collections.unmodifiableCollection( 4 HeaderExchangeServer.this.getChannels()); 5 } 6 }
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 資料庫路由中介軟體MyCat - 使用篇(6)
【推薦】 HBase最佳實踐 - 叢集規劃
【推薦】 網易物件儲存NOS圖床神器