一起寫RPC框架(十六)RPC註冊中心三--註冊中心服務提供者端的處理
上兩節我們簡單地說明了註冊中心的功能和基本結構,本節重點講述註冊中心如何處理服務提供者傳送過來的資訊
因為我們之前講過Netty網路端的定義說明的,也講了服務提供者釋出服務的知識了,我們看看預設的註冊中心的處理器是如何處理髮布的資訊的
我們看看註冊中心處理器DefaultRegistryProcessor.java需要處理哪些資訊:private void registerProcessor() { this.remotingServer.registerDefaultProcessor(new DefaultRegistryProcessor(this), this.remotingExecutor); this.remotingServer.registerChannelInactiveProcessor(new DefaultRegistryChannelInactiveProcessor(this), remotingChannelInactiveExecutor); }
服務提供者傳送過來的資訊標識是PUBLISH_SERVICE和PUBLISH_CANCEL_SERVICE這兩種標識package org.laopopo.base.registry; import static org.laopopo.common.protocal.LaopopoProtocol.MANAGER_SERVICE; import static org.laopopo.common.protocal.LaopopoProtocol.PUBLISH_CANCEL_SERVICE; import static org.laopopo.common.protocal.LaopopoProtocol.PUBLISH_SERVICE; import static org.laopopo.common.protocal.LaopopoProtocol.SUBSCRIBE_SERVICE; import io.netty.channel.ChannelHandlerContext; import org.laopopo.remoting.ConnectionUtils; import org.laopopo.remoting.model.NettyRequestProcessor; import org.laopopo.remoting.model.RemotingTransporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author BazingaLyn * @description 註冊中心的處理轉換器 * @time 2016年8月13日 * @modifytime 2016年8月31日 */ public class DefaultRegistryProcessor implements NettyRequestProcessor { private static final Logger logger = LoggerFactory.getLogger(DefaultRegistryProcessor.class); private DefaultRegistryServer defaultRegistryServer; public DefaultRegistryProcessor(DefaultRegistryServer defaultRegistryServer) { this.defaultRegistryServer = defaultRegistryServer; } @Override public RemotingTransporter processRequest(ChannelHandlerContext ctx, RemotingTransporter request) throws Exception { if (logger.isDebugEnabled()) { logger.debug("receive request, {} {} {}",// request.getCode(), // ConnectionUtils.parseChannelRemoteAddr(ctx.channel()), // request); } switch (request.getCode()) { case PUBLISH_SERVICE: // 處理服務提供者provider推送的服務資訊 return this.defaultRegistryServer.getProviderManager().handlerRegister(request, ctx.channel()); // 要保持冪等性,同一個例項重複釋出同一個服務的時候對於註冊中心來說是無影響的 case PUBLISH_CANCEL_SERVICE: // 處理服務提供者provider推送的服務取消的資訊 return this.defaultRegistryServer.getProviderManager().handlerRegisterCancel(request, ctx.channel()); case SUBSCRIBE_SERVICE: // 處理服務消費者consumer訂閱服務的請求 return this.defaultRegistryServer.getProviderManager().handleSubscribe(request, ctx.channel()); case MANAGER_SERVICE: // 處理管理者傳送過來的服務管理服務 return this.defaultRegistryServer.getProviderManager().handleManager(request, ctx.channel()); } return null; } }
首先我們先看PUBLISH_SERVICE這個邏輯的處理,程式碼註釋中寫的很清楚,要保持冪等性,否則會造成很多莫名其妙的問題,這個當然是用ProviderManager,各司其職嘛,我們看看handlerRegister的處理,看程式碼之前我們先明確一下這段程式碼要做的事情:
1)將服務提供者釋出的資訊記錄到註冊中心的本地記憶體中去,相當於把釋出的資訊記錄下來,否則服務消費者來訂閱服務的時候,去哪裡查詢這些記錄
2)從歷史記錄中找到其歷史的稽核記錄,如果歷史記錄上,也就是說硬碟持久化中記錄到的稽核記錄,如果以前顯示是稽核通過的情況下,就直接設定該服務稽核通過,無需再稽核,如果在歷史記錄中沒有找到該服務的資訊,說明該服務是第一次註冊釋出,我們需要給出預設的服務持久化資訊
3)如果該服務已經稽核通過,我們需要通知通知訂閱該服務的消費者新增了一個服務提供者
4)當上述三件事情都搞定了,我們需要傳送一個ACK資訊給服務提供者,告之它註冊中心已經成功接收到它釋出的資訊了,叫他安心~
因為註冊中心的所有資訊都是基於java記憶體實現的,所以我們使用全域性變數來儲存這些資訊,某些資訊,比如稽核或者權重的資訊需要持久化到硬碟的,我們做定時任務處理,定時刷盤,全域性變數如下
// 某個服務
private final ConcurrentMap<String, ConcurrentMap<Address, RegisterMeta>> globalRegisterInfoMap = new ConcurrentHashMap<String, ConcurrentMap<Address, RegisterMeta>>();
// 指定節點都註冊了哪些服務
private final ConcurrentMap<Address, ConcurrentSet<String>> globalServiceMetaMap = new ConcurrentHashMap<RegisterMeta.Address, ConcurrentSet<String>>();
// 某個服務 訂閱它的消費者的channel集合
private final ConcurrentMap<String, ConcurrentSet<Channel>> globalConsumerMetaMap = new ConcurrentHashMap<String, ConcurrentSet<Channel>>();
// 提供者某個地址對應的channel
private final ConcurrentMap<Address, Channel> globalProviderChannelMetaMap = new ConcurrentHashMap<RegisterMeta.Address, Channel>();
//每個服務的歷史記錄
private final ConcurrentMap<String, RegistryPersistRecord> historyRecords = new ConcurrentHashMap<String, RegistryPersistRecord>();
//每個服務對應的負載策略
private final ConcurrentMap<String, LoadBalanceStrategy> globalServiceLoadBalance = new ConcurrentHashMap<String, LoadBalanceStrategy>();
handlerRegister的具體邏輯:
@Override
public RemotingTransporter handlerRegister(RemotingTransporter remotingTransporter, Channel channel) throws RemotingSendRequestException,
RemotingTimeoutException, InterruptedException {
// 準備好ack資訊返回個provider,悲觀主義,預設返回失敗ack,要求provider重新發送請求
AckCustomBody ackCustomBody = new AckCustomBody(remotingTransporter.getOpaque(), false);
RemotingTransporter responseTransporter = RemotingTransporter.createResponseTransporter(LaopopoProtocol.ACK, ackCustomBody,
remotingTransporter.getOpaque());
// 接收到主體資訊
PublishServiceCustomBody publishServiceCustomBody = serializerImpl().readObject(remotingTransporter.bytes(), PublishServiceCustomBody.class);
RegisterMeta meta = RegisterMeta.createRegiserMeta(publishServiceCustomBody,channel);
if (logger.isDebugEnabled()) {
logger.info("Publish [{}] on channel[{}].", meta, channel);
}
// channel上打上該服務的標記 方便當channel inactive的時候,直接從channel上拿到標記的屬性,通知
attachPublishEventOnChannel(meta, channel);
// 一個服務的最小單元,也是確定一個服務的最小單位
final String serviceName = meta.getServiceName();
// 找出提供此服務的全部地址和該服務在該地址下的稽核情況
ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceName);
synchronized (globalRegisterInfoMap) {
//歷史記錄中的所有服務的持久化的資訊記錄
ConcurrentMap<String, RegistryPersistRecord> concurrentMap = historyRecords;
// 獲取到這個地址可能以前註冊過的註冊資訊
RegisterMeta existRegiserMeta = maps.get(meta.getAddress());
// 如果等於空,則說明以前沒有註冊過 這就需要從歷史記錄中將某些服務以前註冊稽核的資訊恢復一下記錄
if (null == existRegiserMeta) {
RegistryPersistRecord persistRecord = concurrentMap.get(serviceName);
//如果歷史記錄中沒有記錄該資訊,也就說持久化中沒有記錄到該資訊的時候,就需要構造預設的持久化資訊
if(null == persistRecord){
persistRecord = new RegistryPersistRecord();
persistRecord.setServiceName(serviceName); //持久化的服務名
persistRecord.setBalanceStrategy(LoadBalanceStrategy.WEIGHTINGRANDOM); //預設的負載均衡的策略
PersistProviderInfo providerInfo = new PersistProviderInfo();
providerInfo.setAddress(meta.getAddress()); //服務提供者的地址
providerInfo.setIsReviewed(ServiceReviewState.HAS_NOT_REVIEWED); //服務預設是未稽核
persistRecord.getProviderInfos().add(providerInfo);
concurrentMap.put(serviceName, persistRecord);
}
//迴圈該服務的所有服務提供者例項的資訊,獲取到當前例項的稽核狀態,設定好meta的稽核資訊
for(PersistProviderInfo providerInfo:persistRecord.getProviderInfos()){
if(providerInfo.getAddress().equals(meta.getAddress())){
meta.setIsReviewed(providerInfo.getIsReviewed());
}
}
existRegiserMeta = meta;
maps.put(meta.getAddress(), existRegiserMeta);
}
this.getServiceMeta(meta.getAddress()).add(serviceName);
//預設的負載均衡的策略
LoadBalanceStrategy defaultBalanceStrategy = LoadBalanceStrategy.WEIGHTINGRANDOM;
if(null != concurrentMap.get(serviceName)){
RegistryPersistRecord persistRecord = concurrentMap.get(serviceName);
if(null != persistRecord.getBalanceStrategy()){
defaultBalanceStrategy = persistRecord.getBalanceStrategy();
}
}
// 設定該服務預設的負載均衡的策略
globalServiceLoadBalance.put(serviceName, defaultBalanceStrategy);
// 判斷provider傳送的資訊已經被成功的儲存的情況下,則告之服務註冊成功
ackCustomBody.setSuccess(true);
// 如果稽核通過,則通知相關服務的訂閱者
if (meta.getIsReviewed() == ServiceReviewState.PASS_REVIEW) {
this.defaultRegistryServer.getConsumerManager().notifyMacthedSubscriber(meta, globalServiceLoadBalance.get(serviceName));
}
}
//將地址與該channel繫結好,方便其他地方使用
globalProviderChannelMetaMap.put(meta.getAddress(), channel);
return responseTransporter;
}
這段冗長的程式碼就是完成的四點需求了,收到服務提供者釋出的資訊之後就是接受到該資訊,從歷史記錄中恢復該資訊的稽核記錄,負載均衡的策略,然後如果此時這個服務的稽核狀態是通過的情況下,就請兄弟類ConsumerManager把該服務的資訊推送給對應的訂閱者,最後傳送ack資訊給服務提供者,告之它接收資訊成功,這段程式碼整體的流程就是如此
另一種場景就是服務提供者通知釋出服務取消的訊號的時候,也就是PUBLISH_CANCEL_SERVICE這個訊號的時候,要做的事情也是很簡單:
1)將記錄在java記憶體中的資訊移除掉,其實也就是將全域性變數的資訊移除掉
2)如果該服務已經是稽核通過的,則需要再發送資訊給訂閱該服務的服務消費者,該服務下線,使其不再呼叫該服務了
/***
* 服務下線的介面
*
* @param meta
* @param channel
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
*/
public void handlePublishCancel(RegisterMeta meta, Channel channel) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
if (logger.isDebugEnabled()) {
logger.info("Cancel publish {} on channel{}.", meta, channel);
}
//將其channel上打上的標記移除掉
attachPublishCancelEventOnChannel(meta, channel);
final String serviceMeta = meta.getServiceName();
ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceMeta);
if (maps.isEmpty()) {
return;
}
synchronized (globalRegisterInfoMap) {
Address address = meta.getAddress();
RegisterMeta data = maps.remove(address);
if (data != null) {
this.getServiceMeta(address).remove(serviceMeta);
if (data.getIsReviewed() == ServiceReviewState.PASS_REVIEW )
this.defaultRegistryServer.getConsumerManager().notifyMacthedSubscriberCancel(meta);
}
}
}
上述程式碼就是簡單的實現了上述的功能
當然還有一種場景就是當服務提供者的例項直接關閉的時候,與註冊中心之間保持的netty長連線也會斷掉,例項關閉自然就是服務下線,所以我們也要對其作出處理
因為某個服務提供者例項提供的遠端服務可能不止一個,所以我們要做的就是將這些服務全部做下線處理,因為我們之前已經將建立的channel上打上了tag,所以我們很容易知道某個服務例項上到底提供了多少個遠端服務
@Override
public void processChannelInactive(ChannelHandlerContext ctx) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
//獲取到當前的channel,此時的channel應該是打過記號的
Channel channel = ctx.channel();
// 取消之前釋出的所有服務
ConcurrentSet<RegisterMeta> registerMetaSet = channel.attr(S_PUBLISH_KEY).get();
//如果該channel打過的記號是空,或者是空集合的話,直接返回
if (registerMetaSet == null || registerMetaSet.isEmpty()) {
logger.debug("registerMetaSet is empty");
return;
}
//接下來需要做兩件事情
//1 修改當前註冊中心該channel所提供的所有服務取消
//2 傳送請求告之consumer該地址對應的所有服務下線
Address address = null;
for (RegisterMeta meta : registerMetaSet) {
if (address == null) {
address = meta.getAddress();
}
this.defaultRegistryServer.getProviderManager().handlePublishCancel(meta, channel);
}
}
總而言之,註冊中心處理服務提供者的之間的業務邏輯還是相對比較簡單的,總結而言就是記錄,通知服務消費者,反之就是消除記錄,通知服務消費者服務下線
詳細程式碼檢視
https://github.com/BazingaLyn/laopopo-rpc/tree/master/laopopo-registry-default/src/main/java/org/laopopo/base/registry