1. 程式人生 > >【一起學原始碼-微服務】Hystrix 原始碼二:Hystrix核心流程:Hystix非降級邏輯流程梳理

【一起學原始碼-微服務】Hystrix 原始碼二:Hystrix核心流程:Hystix非降級邏輯流程梳理

說明

原創不易,如若轉載 請標明來源!

歡迎關注本人微信公眾號:壹枝花算不算浪漫
更多內容也可檢視本人部落格:一枝花算不算浪漫

前言

前情回顧

上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的HystrixInvocationHandler 生成對應的動態代理。

本講目錄

這一講開始講解Hystrix相關程式碼,當然還是基於上一個元件Feign的基礎上開始講解的,這裡預設你已經對Feign有過大致瞭解。

目錄如下:

  1. 執行緒池初始化過程
  2. HystrixCommand通過執行緒池執行原理

由於這裡面程式碼比較多,所以我都是將一些主要核心程式碼發出來,這裡後面會彙總一個流程圖,可以參考流程圖 自己一點點除錯。

這裡建議在回撥的地方都加上斷點,而且修改feign和hystrix超時時間,瀏覽器傳送請求後,一步步debug程式碼。

原始碼分析

執行緒池初始化過程

上一講已經講過啟用Hystrix後,構造的InvocationHandler為HystrixInvocationHandler,所以當呼叫FeignClient服務例項的時候,會先執行HystrixInvocationHandler.invoke()方法,這裡我們先跟進這個方法:

final class HystrixInvocationHandler implements InvocationHandler {

    @Override
    public Object invoke(final Object proxy, final Method method, final Object[] args)
            throws Throwable {

        // 構建一個HystrixCommand
        // HystrixCommand構造引數需要Setter物件
        HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) {
            @Override
            protected Object run() throws Exception {
                try {
                    // 執行SynchronousMethodHandler.invoke方法
                    return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
                } catch (Exception e) {
                    throw e;
                } catch (Throwable t) {
                    throw (Error) t;
                }
            }
        }

        // 省略部分程式碼...

        return hystrixCommand.execute();
    }
}

這裡主要是構造HystrixCommand,我們先看看它的建構函式以及執行緒池池初始化的程式碼:

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {

    protected HystrixCommand(HystrixCommandGroupKey group) {
        super(group, null, null, null, null, null, null, null, null, null, null, null);
    }
}

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

        this.commandGroup = initGroupKey(group);
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        // 初始化執行緒池
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

      // 省略部分程式碼...
    }

    private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixThreadPool
            return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
        } else {
            return fromConstructor;
        }
    }
}

public interface HystrixThreadPool {
    final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

    static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
        // 這個執行緒池的key就是我們feignClient定義的value名稱,其他服務的projectName
        // 在我們的demo中:key = serviceA
        String key = threadPoolKey.name();

        // threadPools是一個map,key就是serviceA
        HystrixThreadPool previouslyCached = threadPools.get(key);
        if (previouslyCached != null) {
            return previouslyCached;
        }

        // 初始化執行緒池
        synchronized (HystrixThreadPool.class) {
            if (!threadPools.containsKey(key)) {
                threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
            }
        }
        return threadPools.get(key);
    }
}


public abstract class HystrixThreadPoolProperties {
    /* defaults */
    static int default_coreSize = 10;
    static int default_maximumSize = 10;
    static int default_keepAliveTimeMinutes = 1;
    static int default_maxQueueSize = -1;            
    static boolean default_allow_maximum_size_to_diverge_from_core_size = false;
    static int default_queueSizeRejectionThreshold = 5;
    static int default_threadPoolRollingNumberStatisticalWindow = 10000;
    static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10;

    // 省略部分程式碼...
}

這裡主要是初始化執行緒池的邏輯,從HystrixCommand一直到HystrixThreadPoolProperties。這裡的threadPools 是一個Map,一個serviceName會對應一個執行緒池。

執行緒池的預設配置都在HystrixThreadPoolProperties中。執行緒池的核心執行緒和最大執行緒數都是10,佇列的大小為-1,這裡意思是不使用佇列。

HystrixCommand建構函式需要接收一個Setter物件,Setter中包含兩個很重要的屬性,groupKeycommandKey, 這裡看下Setter是如何構造的:

final class HystrixInvocationHandler implements InvocationHandler {

    HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
                           SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
        this.target = checkNotNull(target, "target");
        this.dispatch = checkNotNull(dispatch, "dispatch");
        this.fallbackFactory = fallbackFactory;
        this.fallbackMethodMap = toFallbackMethod(dispatch);
        this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet());
    }

    static Map<Method, Setter> toSetters(SetterFactory setterFactory, Target<?> target,
                                       Set<Method> methods) {
        Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
        for (Method method : methods) {
            method.setAccessible(true);
            result.put(method, setterFactory.create(target, method));
        }
        return result;
    }
}

public interface SetterFactory {
    HystrixCommand.Setter create(Target<?> target, Method method);
    final class Default implements SetterFactory {
        @Override
        public HystrixCommand.Setter create(Target<?> target, Method method) {
            // groupKey既是呼叫的服務服務名稱:serviceA
            String groupKey = target.name();
            // commandKey即是方法的名稱+入參定義等,一個commandKey能夠確定這個類中唯一的一個方法
            String commandKey = Feign.configKey(target.type(), method);
            return HystrixCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
            }
        }
    }
}

構建一個HystrixCommand時必須要傳入這兩個引數。

  1. groupKey: 就是呼叫的服務名稱,例如我們demo中的ServiceA,groupKey對應著一個執行緒池。
  2. commandKey: 一個FeignClient介面中的一個方法就是一個commandKey, 其組成為方法名和入參等資訊。

groupkeycommandKey是一對多的關係,例如ServiceA中的2個方法,那麼groupKey就對應著這個ServiceA中的2個commandKey。

groupKey -> target.name() -> ServiceA -> @FeignClient註解裡設定的服務名稱

commanKey -> ServiceAFeignClient#sayHello(String)

這裡回撥函式執行HystrixInvocationHandler.this.dispatch.get(method).invoke(args) 其實就是執行SynchronousMethodHandler.invoke() 方法了。但是什麼時候才會回調回來呢?後面接著看吧。

HystrixCommand通過執行緒池執行原理

上面已經看了執行緒池的初始化過程,當一個服務第一次被呼叫的時候,會判斷threadPools (資料結構為ConcurrentHashMap) 中是否存在這個serviceName對應的執行緒池,如果沒有的話則會初始化一個對應的執行緒池。執行緒池預設配置屬性在HystrixThreadPoolProperties中可以看到。

Hystrix執行緒池預設是不使用佇列進行執行緒排隊的,核心執行緒數為10。接下來我們看看建立HystrixCommand後,執行緒池是如何將HystrixCommand 命令提交的:

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

    public Future<R> queue() {
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }

                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }

                final boolean res = delegate.cancel(interruptOnFutureCancel.get());

                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
            
        };

        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        throw hre;
                    }
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }
}

這裡又是一堆的回撥函式,我們可以在每個回撥函式中打上斷點,然後一點點除錯。
這裡主要是通過toObservable()方法構造了一個Future<R>, 然後包裝此Future,添加了中斷等邏輯,後面使用f.get() 阻塞獲取執行緒執行結果,最後返回Future物件。

這裡我們的重點在於尋找哪裡將HystrixCommand丟入執行緒池,然後返回一個Future的。
接著往後跟進程式碼:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    public Observable<R> toObservable() {
        // _cmd就是HystrixInvocationHandler物件
        // 裡面包含要請求的method資訊,threadPool資訊,groupKey,commandKey等資訊
        final AbstractCommand<R> _cmd = this;
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };

        // 省略部分回撥函式程式碼...

        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // 是否使用請求快取,預設為false
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                // 請求快取相關
                final String cacheKey = getCacheKey();

                // 省略部分程式碼...

                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // put in cache
                if (requestCacheEnabled && cacheKey != null) {
                    // 省略部分程式碼...
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)
                        .doOnUnsubscribe(unsubscribeCommandCleanup)
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
    }
}

toObservable()是比較核心的程式碼,這裡也是定義了很多回調函式,上面程式碼做了精簡,留下一些核心邏輯,在defer()中構造返回了一個Observable物件,這個Observable是包含上面的一些回撥函式的。

通過debug程式碼,這裡會直接執行到applyHystrixSemantics這個建構函式Func0中的call()方法中,通過語意 我們可以大致猜到這個函式的意思:應用Hystrix語義
接著往下跟進程式碼:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        executionHook.onStart(_cmd);
        // 判斷是否短路
        if (circuitBreaker.attemptExecution()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            // 如果不使用Semaphore配置,那麼tryAcquire使用的是TryableSemaphoreNoOp中的方法,返回true
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }
}

這裡面我們預設使用的執行緒池的隔離配置,所以executionSemaphore.tryAcquire()都會返回true,這裡有個重要的方法:executeCommandAndObserve(_cmd), 我們繼續往後跟進這個方法:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        // 省略部分回撥函式...

        Observable<R> execution;
        // 預設配置timeOutEnabled為true
        if (properties.executionTimeoutEnabled().get()) {
            // 執行指定的隔離執行命令
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
}

對於Hystrix來說,預設是開啟超時機制的,這裡會執行executeCommandWithSpecifiedIsolation(), 返回一個執行的Observable.還是通過方法名我們可以猜測這個方法是:使用指定的隔離執行命令
繼續往裡面跟進:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        //we have not been unsubscribed, so should proceed
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        }
    }
}

這裡就是我們千辛萬苦需要找的核心方法了,裡面仍然是一個回撥函式,通過斷點除錯,這裡會先執行:subscribeOn回撥函式,執行threadPool.getScheduler方法,我們進一步往後跟進:

public interface HystrixThreadPool {
    @Override
    public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
        touchConfig();
        return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
    }

    private void touchConfig() {
        final int dynamicCoreSize = properties.coreSize().get();
        final int configuredMaximumSize = properties.maximumSize().get();
        int dynamicMaximumSize = properties.actualMaximumSize();
        final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        boolean maxTooLow = false;

        // 動態調整最大執行緒池的數量
        if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
            //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
            dynamicMaximumSize = dynamicCoreSize;
            maxTooLow = true;
        }

        // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
        if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
            if (maxTooLow) {
                logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ".  Maximum size will be set to " +
                        dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            }
            threadPool.setCorePoolSize(dynamicCoreSize);
            threadPool.setMaximumPoolSize(dynamicMaximumSize);
        }

        threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
    }
}

public class HystrixContextScheduler extends Scheduler {
    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }

    @Override
    public Worker createWorker() {
        // 構建一個預設的Worker
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }

    private static class ThreadPoolScheduler extends Scheduler {

        private final HystrixThreadPool threadPool;
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

        @Override
        public Worker createWorker() {
            // 預設的worker為:ThreadPoolWorker
            return new ThreadPoolWorker(threadPool, shouldInterruptThread);
        }

    }

    private class HystrixContextSchedulerWorker extends Worker {
        // 執行schedule方法
        @Override
        public Subscription schedule(Action0 action) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            // 預設的worker為:ThreadPoolWorker
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }
    }


    // 執行command的核心類
    private static class ThreadPoolWorker extends Worker {

        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

        @Override
        public void unsubscribe() {
            subscription.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return subscription.isUnsubscribed();
        }

        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            // This is internal RxJava API but it is too useful.
            ScheduledAction sa = new ScheduledAction(action);

            subscription.add(sa);
            sa.addParent(subscription);

            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            throw new IllegalStateException("Hystrix does not support delayed scheduling");
        }
    }
}

touchConfig() 方法主要是重新設定最大執行緒池actualMaximumSize的,這裡預設的allowMaximumSizeToDivergeFromCoreSize是false。

HystrixContextScheduler類中有HystrixContextSchedulerWorkerThreadPoolSchedulerThreadPoolWorker 這幾個內部類。看看它們的作用:

  1. HystrixContextSchedulerWorker: 對外提供schedule()方法,這裡會判斷執行緒池佇列是否已經滿,如果滿了這會丟擲異常:Rejected command because thread-pool queueSize is at rejection threshold。 如果配置的佇列大小為-1 則預設返回true。

  2. ThreadPoolScheduler:執行createWorker()方法,預設使用ThreadPoolWorker()

  3. ThreadPoolWorker:執行command的核心邏輯

private static class ThreadPoolWorker extends Worker {

    private final HystrixThreadPool threadPool;
    private final CompositeSubscription subscription = new CompositeSubscription();
    private final Func0<Boolean> shouldInterruptThread;

    @Override
    public Subscription schedule(final Action0 action) {
        if (subscription.isUnsubscribed()) {
            return Subscriptions.unsubscribed();
        }

        ScheduledAction sa = new ScheduledAction(action);
        subscription.add(sa);
        sa.addParent(subscription);
        // 獲取執行緒池
        ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
        // 將包裝後的HystrixCommand submit到執行緒池,然後返回FutureTask
        FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
        sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

        return sa;
    }
}

原來一個command就是在這裡被提交到執行緒池的,再次回到AbstractCommand.executeCommandWithSpecifiedIsolation()方法中,這裡會回撥到這個回撥函式的call()方法中,這裡一路執行邏輯如下:

getUserExecutionObservable(_cmd)==>getExecutionObservable()==>hystrixCommand.run()==>SynchronousMethodHandler.invoke()

這裡最後執行到HystrixInvocationHandler中的invoke()方法中的回撥函式run()中,最後執行SynchronousMethodHandler.invoke()方法。

一個正常的feign請求,經過hystrix走一遍也就返回對應的response。

總結

上面一頓分析,不知道大家有沒有對hystrix 執行緒池及command執行是否有些理解了?

這個是一個正向流程,沒有涉及超時、熔斷、降級等程式碼。關於這些異常降級的原始碼會在後面一篇文章涉及。

還是之前的建議,大家可以在每個相關的回撥函式打上斷點,然後一點點除錯。

最後再總結一下簡單的流程:

  1. 瀏覽器傳送請求,執行HystrixTargter
  2. 建立HystrixCommand,根據serviceName構造執行緒池
  3. AbstractCommand中一堆回撥函式,最後將command交由執行緒池submit處理

畫一張流程圖加深理解:

高清大圖:https://www.processon.com/view/link/5e1c128ce4b0169fb51ce77e

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

相關推薦

一起原始碼-服務Hystrix 原始碼Hystrix核心流程Hystix降級邏輯流程梳理

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr

一起原始碼-服務Ribbon 原始碼Ribbon概念理解及Demo除錯

前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類

一起原始碼-服務Ribbon 原始碼通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析

前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的

一起原始碼-服務Ribbon 原始碼Ribbon與Eureka整合原理分析

前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下

一起原始碼-服務Ribbon 原始碼進一步探究Ribbon的IRule和IPing

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Ribbon原始碼Ribbon原始碼解讀彙總篇~

前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F

一起原始碼-服務Feign 原始碼原始碼初探,通過Demo Debug Feign原始碼

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Feign 原始碼Feign動態代理構造過程

前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr

一起原始碼-服務Feign 原始碼Feign結合Ribbon實現負載均衡的原理分析

前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d

一起原始碼-服務Hystrix 原始碼Hystrix基礎原理與Demo搭建

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe

一起原始碼-服務Hystrix 原始碼Hystrix核心流程Hystix降級、熔斷等原理剖析

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講解了Hystrix在配合feign的過程中,一個正常的請求邏輯該怎樣處理,這裡涉及到執行緒池的建立、HystrixCommand的執行等邏輯。 如圖所示:

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取

前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之EurekaServer上下文EurekaClient建立

前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli

一起原始碼-服務Nexflix Eureka 原始碼在眼花繚亂的程式碼中,EurekaClient是如何註冊的?

前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka

一起原始碼-服務Nexflix Eureka 原始碼通過單元測試來Debug Eureka註冊過程

前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。

一起原始碼-服務Nexflix Eureka 原始碼EurekaClient登錄檔抓取 精妙設計分析!

前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI

一起原始碼-服務Nexflix Eureka 原始碼服務續約原始碼分析

前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著

一起原始碼-服務Nexflix Eureka 原始碼服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?

前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註

一起原始碼-服務Nexflix Eureka 原始碼十一EurekaServer自我保護機制竟然有這麼多Bug?

前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer叢集模式原始碼分析

前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收