1. 程式人生 > >dubbo原始碼分析8 -- DubboProtocol 之提供端釋出服務export

dubbo原始碼分析8 -- DubboProtocol 之提供端釋出服務export

在前面提到,在RegistryProtocol釋出服務時,首先會dubbo對外提供介面

根據url的地址,協議是dubbo,呼叫protocol.export(…), 但是根據ExtensionLoader.getExtensionLoader獲取的到的protocol, 這個protocol是個裝飾者(一個是啟動lisenter,一個是建立單性列表filter).最後是原生的dubboProtocol執行單性列表filter,最後一個Invoker是Wrapper的invokeMethod..

以下是DubboProtocol部分原始碼

  • 根據url獲取到ip:port作為key,如果已經建立過了,就直接reset
  • 建立服務 ,在服務外面包裝了一個HeaderExchangeServer,主要是提供心跳機制。定時往dubbo伺服器傳送資料,超時:如果是客戶端的話就重連否則關閉服務.
  • 服務是NettyServer,
  • -
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
   //dubbo://..
     URL url = invoker.getUrl();
     // com.test.ITestService:1.0.0:20890
     String key = serviceKey(url);
     DubboExporter<T> exporter = new
DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //開啟服務 openServer(url); // modified by lishen optimizeSerialization(url); return exporter; } /**開啟服務 當10.118.14.204:20890已經建立過服務,那麼就reset,否則建立服務 **/ private void openServer(URL url) { // 10.118.14.204:20890
String key = url.getAddress(); //client 也可以暴露一個只有server可以呼叫的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); //沒有的話就是建立服務 if (server == null) { serverMap.put(key, createServer(url)); } else { //server支援reset,配合override功能使用 server.reset(url); } } } /**建立服務 開啟心跳檢測,預設使用netty。組裝url **/ private ExchangeServer createServer(URL url) { //預設開啟server關閉時傳送readonly事件 url = url.addParameterIfAbsent("channel.readonly.sent", Boolean.TRUE.toString()); //預設開啟heartbeat url = url.addParameterIfAbsent("heartbeat", String.valueOf(60 * 1000)); //netty String str = url.getParameter("server", "netty"); url = url.addParameter("codec", Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server = Exchangers.bind(url, requestHandler); //.. return server; }

ExchangeServer
dubbo://10.118.14.204:20890/com.test.ITestService?anyhost=true&application=testService..side=provider

public static ExchangeServer bind(URL url, 
 ExchangeHandler handler) throws RemotingException {
     url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
     return getExchanger(url).bind(url, handler);
 }
 public static Exchanger getExchanger(URL url) {
     String type = url.getParameter("exchanger", "header");
     return ExtensionLoader.getExtensionLoader(Exchanger.class).
        getExtension(type);
 }

HeaderExchanger

因為已經指定header,提供服務

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";
    //服務端繫結
    public ExchangeServer bind(URL url, ExchangeHandler handler) 
    throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(
         url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

HeaderExchangeServer服務端

主要就是提供了心跳機制.

  • 啟動心跳機制
public class HeaderExchangeServer implements ExchangeServer {
  private final ScheduledExecutorService scheduled = Executors.
   newScheduledThreadPool(1,new NamedThreadFactory(
   "dubbo-remoting-server-heartbeat", true));
  // 心跳定時器
  private ScheduledFuture<?> heatbeatTimer;
  // 心跳超時,毫秒。預設0,不會執行心跳。
  private int heartbeat;
  private int heartbeatTimeout;
  private final Server server;
  private volatile boolean closed = false;

  public HeaderExchangeServer(Server server) {
    //..屬性賦值
    //心跳
    startHeatbeatTimer();
  }

    private void startHeatbeatTimer() {
      //關閉心跳定時
      stopHeartbeatTimer();
      if (heartbeat > 0) {
         //每隔heartbeat時間執行一次
          heatbeatTimer = scheduled.scheduleWithFixedDelay(
                  new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
                      //獲取channels
                      public Collection<Channel> getChannels() {
                          return Collections.unmodifiableCollection(
                                  HeaderExchangeServer.this.getChannels() );
                      }
                  }, heartbeat, heartbeatTimeout),
                  heartbeat, heartbeat,TimeUnit.MILLISECONDS);
      }
      }
      //關閉心跳定時
      private void stopHeartbeatTimer() {
          try {
              ScheduledFuture<?> timer = heatbeatTimer;
              if (timer != null && ! timer.isCancelled()) {
                  timer.cancel(true);
              }
          } catch (Throwable t) {
              logger.warn(t.getMessage(), t);
          } finally {
              heatbeatTimer =null;
          }
      }

心跳執行緒HeartBeatTask

  • 在超時時間之內,傳送資料
  • 在超時時間在外,是客戶端的話,重連;是服務端,那麼關閉
final class HeartBeatTask implements Runnable {
    public void run() {
      long now = System.currentTimeMillis();
      for ( Channel channel : channelProvider.getChannels() ) {
       //如果通道已經關閉了,跳過
          if (channel.isClosed()) {
              continue;
          }
         //獲取最後的讀取時間
         Long lastRead = ( Long ) channel.getAttribute(
                 HeaderExchangeHandler.KEY_READ_TIMESTAMP );
         //獲取最後的寫時間
         Long lastWrite = ( Long ) channel.getAttribute(
                 HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
         //判斷時機
         if ( ( lastRead != null && now - lastRead > heartbeat )  ||
           ( lastWrite != null && now - lastWrite > heartbeat ) ) {
             Request req = new Request();
             req.setVersion( "2.0.0" );
             req.setTwoWay( true );
             req.setEvent( Request.HEARTBEAT_EVENT );
             //向服務端傳送資料
             channel.send( req );
         }
         //超時了.
         if ( lastRead != null && now - lastRead > heartbeatTimeout )
             if (channel instanceof Client) {
              //是客戶端的話,就重連
                ((Client)channel).reconnect();
             } else {
             //關閉
                channel.close();
             }
         }
       }     
     }
}