1. 程式人生 > >一起寫RPC框架(二十)RPC服務消費者二--服務消費者訂閱服務

一起寫RPC框架(二十)RPC服務消費者二--服務消費者訂閱服務

訂閱服務看起來是比較簡單容易實現的功能,乍一看,就是傳送訂閱服務的名稱給註冊中心,然後註冊中心反饋給服務消費者,感覺萬事大吉,其實並不是這樣的,這塊是比較容易實現的,使用Netty很容易就能實現了,但是寫的時候就會發現各種問題

比如一個現在系統第一次呼叫callRemotingService()的方法的時候,這時候在沒有任何條件的支援下,服務消費者首先需要去註冊中心去獲取資訊,然後根據拿到的資訊去與服務提供者建立Netty的長連線,獲取到長連線的channel之後就可以進行呼叫了

感覺就是很簡單,跟以前我們處理訂閱,或者釋出服務的處理都跟類似,不過這邊卻多了一個建立長連線的過程,這邊處理就相對比較複雜一點,因為我們知道Netty建立連線是非同步的,這個就像雙刃劍一樣,你拿著IP和埠號,使用Netty去建立連線的時候,因為非同步,程式碼不會阻塞到連線成功建立後返回,立即返回,然後你就會拿著還沒有建立好連線的channel去呼叫,這樣就會導致呼叫錯誤

雖然這種場景不多見,一般情況下,專案都是使用spring管理的,spring一般會預載入的,如果提前建立好所有的服務的長連線,就可以很好的規避這個問題,不過這個問題卻是客觀存在的,所以我們需要解決這個問題

首先,我們還看看一個基本的服務消費者定義的基本介面吧(程式碼大部分來自於Jupiter):

package org.laopopo.client.consumer;

import io.netty.channel.Channel;

import org.laopopo.common.exception.remoting.RemotingSendRequestException;
import org.laopopo.common.exception.remoting.RemotingTimeoutException;
import org.laopopo.common.loadbalance.LoadBalanceStrategy;
import org.laopopo.common.utils.ChannelGroup;
import org.laopopo.common.utils.UnresolvedAddress;
import org.laopopo.remoting.model.RemotingTransporter;



/**
 * 
 * @author BazingaLyn
 * @description 消費端的介面
 * @time 2016年8月15日
 * @modifytime
 */
public interface Consumer {
	
	/**
	 * 遠端呼叫方法
	 * @param serviceName 遠端呼叫的服務名
	 * @param args 引數
	 * @return 
	 * @throws Throwable
	 */
	Object call(String serviceName,Object... args) throws Throwable;
	
	/**
	 * 
	 * @param serviceName
	 * @param timeout 呼叫超時時間
	 * @param args
	 * @return
	 * @throws Throwable
	 */
	Object call(String serviceName,long timeout,Object... args) throws Throwable;
	
	/**
	 * 在當服務端向註冊中心訂閱服務的時候進行管理
	 * @param serviceName
	 * @return
	 */
	SubscribeManager subscribeService(String serviceName);
	
	
	/**
	 * 去連線註冊中心,獲取到與註冊中心的唯一一個channel
	 */
	void getOrUpdateHealthyChannel();
	
	/**
	 * 去註冊中心訂閱服務,註冊中心返回結果之後,回撥NotifyListener的方法
	 * @param subcribeServices
	 * @param listener
	 */
	void subcribeService(String subcribeServices,NotifyListener listener);
	
	/**
	 * 
	 * @param address 該地址是提供者的地址
	 * 一個服務提供者的地址可以維護一組channel,因為一個消費者例項與一個提供者之間的長連結數可以不止一個,不過當然一般情況下,一個就可以了
	 * @return
	 */
	ChannelGroup group(UnresolvedAddress address);
	
	/**
	 * 根據一個服務名,匹配使用者給這個服務設定的負載均衡策略,根據這個負載均衡演算法去找到這個服務對應的與提供者的Channel
	 * @param serviceName
	 * @return
	 */
	ChannelGroup loadBalance(String serviceName);
	
	/**
	 * 當註冊中心告之某個服務多了一個提供者之後,我們需要將其更新
	 * @param serviceName
	 * @param group
	 * @return
	 */
	boolean addChannelGroup(String serviceName, ChannelGroup group);
	
	/**
	 * 當註冊中心告之某個服務的提供者下線的時候,我們也需要服務路由表
	 * @param serviceName
	 * @param group
	 * @return
	 */
	boolean removeChannelGroup(String serviceName, ChannelGroup group);
	
	/**
	 * 核心方法,遠端呼叫
	 * @param channel 消費者與服務提供者的之間建立的長連線的channel
	 * @param request 請求體 包含請求的引數,請求的方法名
	 * @param timeout 請求超時時間
	 * @return
	 * @throws RemotingTimeoutException
	 * @throws RemotingSendRequestException
	 * @throws InterruptedException
	 */
	RemotingTransporter sendRpcRequestToProvider(Channel channel, RemotingTransporter request,long timeout) throws RemotingTimeoutException, RemotingSendRequestException, InterruptedException;
	
	/**
	 * 當註冊中心推送某個服務的負載均衡策略傳送變化之後,需要變更的資訊
	 * @param serviceName
	 * @param loadBalanceStrategy
	 */
	void setServiceLoadBalanceStrategy(String serviceName,LoadBalanceStrategy loadBalanceStrategy);
	
	
	/**
	 * 如果直連的情況下,根據address直接獲取連線
	 * @param address
	 * @return
	 * @throws InterruptedException 
	 */
	Channel directGetProviderByChannel(UnresolvedAddress address) throws InterruptedException;
	
	/**
	 * 啟動consumer端例項
	 */
	void start();
	
	
	interface SubscribeManager {

		/**
		 * 啟動對訂閱服務管理的服務
		 */
        void start();

        /**
         * 當某個服務去註冊中心註冊之後,註冊中心返回訂閱結果,consumer例項
         * 拿著訂閱結果,去向服務提供者建立長連線,因為建立長連線的過程是非同步的,簡而言之獲取一個active的channel的是非同步的,所以當一切貌似看起來ok的時候
         * 其實未必,所以必須進行回撥管理,否則遠端呼叫的時候,可能channel還沒有準備就緒,會報錯
         * 
         * 詳細見 {@link NotifyListener} 類的頭部註釋
         * @param timeoutMillis
         * @return
         */
        boolean waitForAvailable(long timeoutMillis);
    }
	

}

因為本小節主要說明訂閱這個操作,所以我們著重看SubscribeManager subscribeService(String serviceName)這個方法:
@Override
	public SubscribeManager subscribeService(final String service) {

		SubscribeManager manager = new SubscribeManager() {

			private final ReentrantLock lock = new ReentrantLock();
			private final Condition notifyCondition = lock.newCondition();
			private final AtomicBoolean signalNeeded = new AtomicBoolean(false);

			@Override
			public void start() {
				subcribeService(service, new NotifyListener() {
					
					@Override
					public void notify(RegisterMeta registerMeta, NotifyEvent event) {

						// host
						String remoteHost = registerMeta.getAddress().getHost();
						// port vip服務 port埠號-2
						int remotePort = registerMeta.isVIPService() ? (registerMeta.getAddress().getPort() - 2) : registerMeta.getAddress().getPort();

						final ChannelGroup group = group(new UnresolvedAddress(remoteHost, remotePort));
						if (event == NotifyEvent.CHILD_ADDED) {
							// 鏈路複用,如果此host和port對應的連結的channelGroup是已經存在的,則無需建立新的連結,只需要將此group與service建立關係即可
							if (!group.isAvailable()) {

								int connCount = registerMeta.getConnCount() < 0 ? 1 : registerMeta.getConnCount();

								group.setWeight(registerMeta.getWeight());

								for (int i = 0; i < connCount; i++) {

									try {
										// 所有的consumer與provider之間的連結不進行短線重連操作
										DefaultConsumer.this.getProviderNettyRemotingClient().setreconnect(false);
										DefaultConsumer.this.getProviderNettyRemotingClient().getBootstrap()
												.connect(ConnectionUtils.string2SocketAddress(remoteHost + ":" + remotePort)).addListener(new ChannelFutureListener() {

													@Override
													public void operationComplete(ChannelFuture future) throws Exception {
														group.add(future.channel());
														onSucceed(signalNeeded.getAndSet(false));
													}
													
												});
									} catch (Exception e) {
										logger.error("connection provider host [{}] and port [{}] occor exception [{}]", remoteHost, remotePort, e.getMessage());
									}
								}
							}else{
								onSucceed(signalNeeded.getAndSet(false));
							}
							addChannelGroup(service,group);
						}else if(event == NotifyEvent.CHILD_REMOVED){
							removedIfAbsent(service, group);
						}
					}
				});
			}

			@Override
			public boolean waitForAvailable(long timeoutMillis) {
				if (isServiceAvailable(service)) {
                    return true;
                }
				boolean available = false;
                long start = System.nanoTime();
                final ReentrantLock _look = lock;
                _look.lock();
                try {
                    while (!isServiceAvailable(service)) {
                        signalNeeded.set(true);
                        notifyCondition.await(timeoutMillis, MILLISECONDS);

                        available = isServiceAvailable(service);
                        if (available || (System.nanoTime() - start) > MILLISECONDS.toNanos(timeoutMillis)) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    JUnsafe.throwException(e);
                } finally {
                    _look.unlock();
                }
                return available;
			}

			private boolean isServiceAvailable(String service) {
				CopyOnWriteArrayList<ChannelGroup> list = DefaultConsumer.super.getChannelGroupByServiceName(service);
				if(list == null){
					return false;
				}else{
					for(ChannelGroup channelGroup : list){
						if(channelGroup.isAvailable()){
							return true;
						}
					}
				}
				return false;
			}

			private void onSucceed(boolean doSignal) {
				if (doSignal) {
                    final ReentrantLock _look = lock;
                    _look.lock();
                    try {
                        notifyCondition.signalAll();
                    } finally {
                        _look.unlock();
                    }
                }
			}

		};
		manager.start();
		return manager;
	}

這段程式碼主要的核心意思就是獲取的資訊之後,建立連線,因為連線是非同步的,所以加了通知機制,建立成功之後,將健康的channel放入到group中去,然後就維護了所有的channel,這邊的程式碼邏輯還是相對比較複雜的,詳細看原始碼

說也不是說的很清楚,還是看程式碼來的實在~還請見諒