1. 程式人生 > >ribbon源碼之客戶端

ribbon源碼之客戶端

ring nts ebo factory 初始 sync com isa ceo

  客戶端模塊的核心功能是提供統一的用戶請求操作接口

接口定義

  客戶端模塊的核心是IClient接口,定義了客戶端網絡請求的方法。

public interface IClient<S extends ClientRequest, T extends IResponse> {
    public T execute(S request, IClientConfig requestConfig) throws Exception; 
}

  ClientRequest為客戶端定義的請求體,存儲了請求uri、loadbalancer的key,是否重試、配置。

public
class ClientRequest implements Cloneable { protected URI uri; protected Object loadBalancerKey = null; protected Boolean isRetriable = null; protected IClientConfig overrideConfig; }

  IResponse為客戶端定義的響應內容的接口。

public interface IResponse extends Closeable
{
   public Object getPayload() throws
ClientException; public boolean hasPayload(); public boolean isSuccess(); public URI getRequestedURI(); public Map<String, ?> getHeaders(); }

  IClientConfigAware定義了需要使用IClientConfig初始化IClient的方法。

public interface IClientConfigAware {
    public abstract void initWithNiwsConfig(IClientConfig clientConfig);
}

類圖

技術分享圖片

客戶端工廠類

  客戶端模塊提供了一個客戶端工廠類(ClientFactory)用於通過配置文件來創建IClient實例和負載均衡器(ILoadBalancer)實例。

public static synchronized IClient getNamedClient(String name) {//根據配置獲取iclient實例,默認使用DefaultClientConfigImpl配置類return getNamedClient(name, DefaultClientConfigImpl.class);
    }
public static synchronized IClient getNamedClient(String name, Class<? extends IClientConfig> configClass) {
...
return createNamedClient(name, configClass);
...
}
public static synchronized IClient createNamedClient(String name, Class<? extends IClientConfig> configClass) throws ClientException {
IClientConfig config = getNamedConfig(name, configClass);//實例化配置類
return registerClientFromProperties(name, config);//通過配置文件創建iclient
}
public static synchronized ILoadBalancer getNamedLoadBalancer(String name) {
return getNamedLoadBalancer(name, DefaultClientConfigImpl.class);
}
public static synchronized ILoadBalancer getNamedLoadBalancer(String name, Class<? extends IClientConfig> configClass) {
...
lb = registerNamedLoadBalancerFromProperties(name, configClass);
...
}
public static synchronized IClient<?, ?> registerClientFromProperties(String restClientName, IClientConfig clientConfig) throws ClientException { 
        IClient<?, ?> client = null;
        ILoadBalancer loadBalancer = null;
        ...
            String clientClassName = (String) clientConfig.getProperty(CommonClientConfigKey.ClientClassName);//通過配置文件獲取client的實現類
            client = (IClient<?, ?>) instantiateInstanceWithClientConfig(clientClassName, clientConfig); //通過配置文件創建client實例
            boolean initializeNFLoadBalancer = Boolean.parseBoolean(clientConfig.getProperty(
                    CommonClientConfigKey.InitializeNFLoadBalancer, DefaultClientConfigImpl.DEFAULT_ENABLE_LOADBALANCER).toString());
            if (initializeNFLoadBalancer) {//如果需要初始化負載均衡器,則通過配置文件創建一個負載均衡器
                loadBalancer  = registerNamedLoadBalancerFromclientConfig(restClientName, clientConfig);
            }
            if (client instanceof AbstractLoadBalancerAwareClient) {//如果client實現AbstractLoadBalancerAwareClient,則註入負載均衡器
                ((AbstractLoadBalancerAwareClient) client).setLoadBalancer(loadBalancer);
            }
        ...return client;
    }
public static ILoadBalancer registerNamedLoadBalancerFromclientConfig(String name, IClientConfig clientConfig) throws ClientException {
...
ILoadBalancer lb = null;
...
String loadBalancerClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerClassName);//
lb = (ILoadBalancer) ClientFactory.instantiateInstanceWithClientConfig(loadBalancerClassName, clientConfig);
...
return lb;
...
}
//初始化指定的class類
public static Object instantiateInstanceWithClientConfig(String className, IClientConfig clientConfig)
          throws InstantiationException, IllegalAccessException, ClassNotFoundException {
Class clazz = Class.forName(className);
if (IClientConfigAware.class.isAssignableFrom(clazz)) {//如果指定的iclient實現了IClientConfigAware,ClientFactory在創建時會使用IClientConfig進行初始化
IClientConfigAware obj = (IClientConfigAware) clazz.newInstance();
obj.initWithNiwsConfig(clientConfig);
return obj;
} else {
try {
if (clazz.getConstructor(IClientConfig.class) != null) {
return clazz.getConstructor(IClientConfig.class).newInstance(clientConfig);
}
} catch (Throwable e) { // NOPMD
}
}
return clazz.newInstance();
}

  使用客戶端工廠類(ClientFactory)涉及的配置:

屬性 實現 默認值
clientname.ribbon.ClientClassName client使用的IClient實現類 com.netflix.niws.client.http.RestClient
clientname.ribbon.InitializeNFLoadBalancer 是否初始化負載均衡器 true
clientname.ribbon.NFLoadBalancerClassName 負載均衡器的實現類 com.netflix.loadbalancer.ZoneAwareLoadBalancer

類圖

技術分享圖片

客戶端實現類

  AbstractLoadBalancerAwareClient實現了通過負載均衡器進行請求調用。LoadBalancerCommand對負載均衡器操作進行了模版,對請求調用提供了回調函數。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
     ...return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);//設置最終的調用uritry {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } //調用execute方法執行請求調用catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        ...
        
    }

  LoadBalancerCommand調用了負載均衡器獲得了一個server,然後調用回調函數執行請求。此外還提供了各個關鍵節點的監聽器和異常重試機制。

public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        if (listenerInvoker != null) {//執行回調接口
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))//調用負載均衡器獲得目標server
                .concatMap(new Func1<Server, Observable<T>>() {
                    ...return operation.call(server).doOnEach(new Observer<T>() {
                                            ....
                                        });
                    ...
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
            
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }

private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }

  子類 HttpRequest用於http請求,內部定義了http請求的各個內容,並且使用了builder模式。

public class HttpRequest extends ClientRequest {protected CaseInsensitiveMultiMap httpHeaders = new CaseInsensitiveMultiMap();//head參數
    protected Multimap<String, String> queryParams = ArrayListMultimap.create();//query參數
    private Object entity;//消息體
    protected Verb verb;//http請求的method:post get head delete等。默認get
}

  

類圖

ribbon源碼之客戶端