dubbo原始碼分析8 -- DubboProtocol 之提供端釋出服務export
阿新 • • 發佈:2019-02-05
在前面提到,在RegistryProtocol釋出服務時,首先會dubbo對外提供介面
根據url的地址,協議是dubbo,呼叫protocol.export(…), 但是根據ExtensionLoader.getExtensionLoader獲取的到的protocol, 這個protocol是個裝飾者(一個是啟動lisenter,一個是建立單性列表filter).最後是原生的dubboProtocol執行單性列表filter,最後一個Invoker是Wrapper的invokeMethod..
以下是DubboProtocol部分原始碼
- 根據url獲取到ip:port作為key,如果已經建立過了,就直接reset
- 建立服務 ,在服務外面包裝了一個HeaderExchangeServer,主要是提供心跳機制。定時往dubbo伺服器傳送資料,超時:如果是客戶端的話就重連否則關閉服務.
- 服務是NettyServer, -
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//dubbo://..
URL url = invoker.getUrl();
// com.test.ITestService:1.0.0:20890
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker,
key, exporterMap);
exporterMap.put(key, exporter);
//開啟服務
openServer(url);
// modified by lishen
optimizeSerialization(url);
return exporter;
}
/**開啟服務
當10.118.14.204:20890已經建立過服務,那麼就reset,否則建立服務
**/
private void openServer(URL url) {
// 10.118.14.204:20890
String key = url.getAddress();
//client 也可以暴露一個只有server可以呼叫的服務。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
//沒有的話就是建立服務
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支援reset,配合override功能使用
server.reset(url);
}
}
}
/**建立服務
開啟心跳檢測,預設使用netty。組裝url
**/
private ExchangeServer createServer(URL url) {
//預設開啟server關閉時傳送readonly事件
url = url.addParameterIfAbsent("channel.readonly.sent",
Boolean.TRUE.toString());
//預設開啟heartbeat
url = url.addParameterIfAbsent("heartbeat", String.valueOf(60 * 1000));
//netty
String str = url.getParameter("server", "netty");
url = url.addParameter("codec",
Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME :
DubboCodec.NAME);
ExchangeServer server = Exchangers.bind(url, requestHandler);
//..
return server;
}
ExchangeServer
dubbo://10.118.14.204:20890/com.test.ITestService?anyhost=true&application=testService..side=provider
public static ExchangeServer bind(URL url,
ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter("exchanger", "header");
return ExtensionLoader.getExtensionLoader(Exchanger.class).
getExtension(type);
}
HeaderExchanger
因為已經指定header,提供服務
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
//服務端繫結
public ExchangeServer bind(URL url, ExchangeHandler handler)
throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(
url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
HeaderExchangeServer服務端
主要就是提供了心跳機制.
- 啟動心跳機制
public class HeaderExchangeServer implements ExchangeServer {
private final ScheduledExecutorService scheduled = Executors.
newScheduledThreadPool(1,new NamedThreadFactory(
"dubbo-remoting-server-heartbeat", true));
// 心跳定時器
private ScheduledFuture<?> heatbeatTimer;
// 心跳超時,毫秒。預設0,不會執行心跳。
private int heartbeat;
private int heartbeatTimeout;
private final Server server;
private volatile boolean closed = false;
public HeaderExchangeServer(Server server) {
//..屬性賦值
//心跳
startHeatbeatTimer();
}
private void startHeatbeatTimer() {
//關閉心跳定時
stopHeartbeatTimer();
if (heartbeat > 0) {
//每隔heartbeat時間執行一次
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
//獲取channels
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels() );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat,TimeUnit.MILLISECONDS);
}
}
//關閉心跳定時
private void stopHeartbeatTimer() {
try {
ScheduledFuture<?> timer = heatbeatTimer;
if (timer != null && ! timer.isCancelled()) {
timer.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
} finally {
heatbeatTimer =null;
}
}
心跳執行緒HeartBeatTask
- 在超時時間之內,傳送資料
- 在超時時間在外,是客戶端的話,重連;是服務端,那麼關閉
final class HeartBeatTask implements Runnable {
public void run() {
long now = System.currentTimeMillis();
for ( Channel channel : channelProvider.getChannels() ) {
//如果通道已經關閉了,跳過
if (channel.isClosed()) {
continue;
}
//獲取最後的讀取時間
Long lastRead = ( Long ) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP );
//獲取最後的寫時間
Long lastWrite = ( Long ) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
//判斷時機
if ( ( lastRead != null && now - lastRead > heartbeat ) ||
( lastWrite != null && now - lastWrite > heartbeat ) ) {
Request req = new Request();
req.setVersion( "2.0.0" );
req.setTwoWay( true );
req.setEvent( Request.HEARTBEAT_EVENT );
//向服務端傳送資料
channel.send( req );
}
//超時了.
if ( lastRead != null && now - lastRead > heartbeatTimeout )
if (channel instanceof Client) {
//是客戶端的話,就重連
((Client)channel).reconnect();
} else {
//關閉
channel.close();
}
}
}
}
}