1. 程式人生 > >十、Spring cloud服務短路(Hystrix)之原始碼解析

十、Spring cloud服務短路(Hystrix)之原始碼解析

  • Spring Cloud Hystrix 原始碼解讀
  • Nertflix Hystrix 原始碼解讀
  • RxJava 基礎

一、Spring Cloud Hystrix 原始碼解讀

1、@EnableCircuitBreaker

(1)職責:
啟用 Circuit Breaker

(2)呼叫鏈路:

    @EnableCircuitBreaker 
    <!-- 通過 EnableCircuitBreaker 註解上的註解 @Import(EnableCircuitBreakerImportSelector.class) 可知 -->
    -> EnableCircuitBreakerImportSelector 
    <!-- 
        EnableCircuitBreakerImportSelector 繼承 SpringFactoryImportSelector<EnableCircuitBreaker> ,通過探尋
    發現,SpringFactoryImportSelector 類下的 selectImports() 方法中的 
    List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
    .loadFactoryNames(this.annotationClass, this.beanClassLoader)));
    可知,這裡是以 EnableCircuitBreaker 全限定名為key,找到對應的預設實現。通過
    搜尋 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker 找到 spring cloud 框架下的 
    spring.factories 檔案中,找到了其預設實現:
    org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
    -->
-> HystrixCircuitBreakerConfiguration

2、HystrixCircuitBreakerConfiguration

(1)初始化元件

  • HystrixCommandAspect
  • HystrixShutdownHook
  • HasFeatures

二、Nertflix Hystrix 原始碼解讀

1、HystrixCommandAspect

(1)依賴元件

  • MetaHolderFactory:生成攔截方法元資訊
  • HystrixCommandFactory:生成 HystrixInvokable
  • HystrixInvokable
    • CommandCollapser
    • GenericObservableCommand
    • GenericCommand

2、Future 來實現超時熔斷

/**
 * 通過 {@link Future} 實現 服務熔斷
 * @author 鹹魚
 * @date 2018/11/14 20:12
 */
public class FutureCircuitBreakerDeo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //初始化執行緒池
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        RandomCommand randomCommand = new RandomCommand();

        Future<String> future = executorService.submit(() -> {
            //獲取 run 方法計算結果
            return randomCommand.run();
        });

        String result = null;
        // 100 ms 超時時間
        try {
            result = future.get(100, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            // fallback 方法呼叫
            result = randomCommand.fallback();
        }

        System.out.println(result);

        executorService.shutdown();
    }

    /**
     * 隨即物件
     */
    private static final Random RANDOM = new Random();

    /**
     * 隨機事件執行命令
     */
    static class RandomCommand implements Command<String> {
        @Override
        public String run() throws InterruptedException {
            long executeTime = RANDOM.nextInt(200);

            System.out.println("execute time : " + executeTime + "ms");

            Thread.sleep(executeTime);

            return "hello";
        }

        @Override
        public String fallback() {
            return "fallback";
        }
    }


    public static interface Command<T> {
        /**
         * 正常執行,並且返回結果
         * @return T
         */
        T run() throws InterruptedException;

        /**
         * 錯誤時,返回容錯結果
         * @return T
         */
        T fallback();
    }
}

三、RxJava 基礎

1、單資料:Single API

//僅能釋出單個數據
        Single.just("Hello,World!")
                //在I/O執行緒執行
                .subscribeOn(Schedulers.io())
                //訂閱並且消費資料
                .subscribe(RxJavaDemo::println);
                Thread.sleep(100);

2、多資料:Observable API

List<Integer> values = Arrays.asList(1,2,3,4,5,6,7,8);
        //釋出多個數據
        Observable.from(values)
                .subscribeOn(Schedulers.computation())
                //訂閱並且消費資料
                .subscribe(RxJavaDemo::println);
        Thread.sleep(100);

3、使用標準 Reactive 模式:

public static void demoStandardReactive() throws InterruptedException {
        List<Integer> values = Arrays.asList(1,2,3);
        //釋出多個數據
        Observable.from(values)
                .subscribeOn(Schedulers.newThread())
                //subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
                //介面 Action1:需實現call(T t)
                //介面 Action0:需實現call()
                .subscribe(
                        //引數一:消費資料
                        value -> {
                            if (value > 2) {
                                throw new IllegalStateException("資料不容許大於2");
                            }
                            println("消費資料:" + value);
                        },
                        //引數二:當發生異常時,中斷執行
                        e -> println("發生異常:" + e.getMessage()),
                        //引數三:當邏輯執行完畢時
                        () -> println("邏輯執行完畢"))
        ;
        //上面是非同步執行,需要休眠等待其執行完畢
        Thread.sleep(100);
    }