1. 程式人生 > >Spring Cloud Hystrix 服務容錯保護:(轉)

Spring Cloud Hystrix 服務容錯保護:(轉)

 服務容錯保護:Spring Cloud Hystrix

  在微服務架構中,我們將系統拆分為很多個服務,各個服務之間通過註冊與訂閱的方式相互依賴,由於各個服務都是在各自的程序中執行,就有可能由於網路原因或者服務自身的問題導致呼叫故障或延遲,隨著服務的積壓,可能會導致服務崩潰。為了解決這一系列的問題,斷路器等一系列服務保護機制出現了。

  斷路器本身是一種開關保護機制,用於在電路上保護線路過載,當線路中有電器發生短路時,斷路器能夠及時切斷故障電路,防止發生過載、發熱甚至起火等嚴重後果。

  在分散式架構中,斷路器模式的作用也是類似的。

  針對上述問題,Spring Cloud Hystrix 實現了斷路器、線路隔離等一系列服務保護功能。它也是基於 Netflix 的開源框架 Hystrix 實現的,該框架的目標在於通過控制那些訪問遠端系統、服務和第三方庫的節點,從而對延遲和故障提供更強大的容錯能力。Hystrix 具備服務降級、服務熔斷、執行緒和訊號隔離、請求快取、請求合併以及服務監控等強大功能。

快速入門

  在開始實現斷路器之前,先用之前實現的一些內容作為基礎,構建一個如下圖所示的服務呼叫關係。

  需要啟動的工程有如下一些:

  • eureka-server 工程:服務註冊中心,埠為8082。
  • hello-service 工程:HELLO-SERVICE 的服務單元,兩個例項啟動埠分別為 2221 和 2222.
  • ribbon-consumer 工程:使用 Ribbon 實現的服務消費者,埠為 3333

   在未加入斷路器之前,關閉8081的例項,傳送 GET 請求到 http://localhost:3333/ribbon-consumer ,可以獲取下面的輸入。

  下面引入 Spring Cloud Hystrix。

  • 在 ribbon-consumer 工程的 pom.xml 的 dependency 節點中引入 spring-cloud-starter-hystrix 依賴:
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
  • 在 ribbon-consumer 工程的主類上使用 @EnableCircuitBreaker 註解開啟斷路器功能:

複製程式碼

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class DemoApplication {

    @Bean
    @LoadBalanced
    RestTemplate restTemplate(){
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

複製程式碼

  注:此處還可以使用 Spring Cloud 應用中的 @SpringCloudApplication 註解來修飾主類,該註解的具體定義如下。可以看到,該註解中包含了上述所引用的三個註解,這意味著一個 Spring Cloud 標準應用應包含服務發現以及斷路器。

複製程式碼

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.springframework.cloud.client;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication
@EnableDiscoveryClient
@EnableCircuitBreaker
public @interface SpringCloudApplication {
}

複製程式碼

  • 改造服務消費方式,新增 HelloService 類,注入 RestTemplate 例項。然後,將在 ConsumerController 中對 RestTemplate 的使用遷移到 helloService 函式中,最後,在 helloService 函式上增加 @HystrixCommand 註解來指定回撥方法。

複製程式碼

package com.example.demo.web;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.client.RestTemplate;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String helloService(){
        return restTemplate.getForEntity("http://hello-service/index",
                String.class).getBody();
    }

    public String helloFallback(){
        return "error";
    }
}

複製程式碼

  • 修改 ConsumerController 類, 注入上面實現的 HelloService 例項,並在 helloConsumer 中進行呼叫:

複製程式碼

package com.example.demo.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-9
 */

@RestController
public class ConsumerController {

    @Autowired
    HelloService helloService;

    @RequestMapping(value = "ribbon-consumer", method = RequestMethod.GET)
    public String helloConsumer(){
        return helloService.helloService();
    }
}

複製程式碼

  下面,對斷路器實現的服務回撥邏輯進行驗證,重新啟動之前關閉的 2221 埠的 hello-service,確保此時服務註冊中心、兩個 hello-service 和 ribbon-consumer 均已啟動,再次訪問 http://localhost:3333/ribbon-consumer 可以輪詢兩個 hello-serive 並返回一些文字資訊。此時斷開其中任意一個埠的 hello-service,再次訪問,當輪詢到關閉的埠服務時,輸出內容為 error ,不再是之前的提示資訊。

  除了通過斷開具體的服務例項來模擬某個節點無法訪問的情況之外,還可以模擬一下服務阻塞(長時間未響應)的情況。下面對hello-serive 的 /index 介面做一些修改,具體如下:

 

複製程式碼

package com.example.demo.web;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-9
 */

@RestController
public class HelloController {

    private final Logger logger = Logger.getLogger(getClass());

    @Autowired
    private DiscoveryClient client;

    @RequestMapping(value = "/index")
    public String index(){
        ServiceInstance instance = client.getLocalServiceInstance();
        // 讓處理執行緒等待幾秒鐘
        int sleepTime = new Random().nextInt(3000);
        logger.info("sleepTime:"+sleepTime);

        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("/hello:host:"+instance.getHost()+" port:"+instance.getPort()
                +" service_id:"+instance.getServiceId());
        return "hello world!";
    }
}

複製程式碼

  通過Thread.sleep 函式可讓 /index 介面的處理執行緒不是馬上返回內容,而是在阻塞幾秒後才返回內容。由於 Hystrix 預設超時時間為 2000 毫秒,所以這裡採用了 0 至 3000 的隨機數以讓處理過程有一定概率發生超時來觸發斷路器。為了更精確的觀察斷路器的觸發,在消費者呼叫函式中做一些時間記錄,具體如下:

複製程式碼

package com.example.demo.web;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.client.RestTemplate;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String helloService(){
        long beginTime = System.currentTimeMillis();
        String body = restTemplate.getForEntity("http://hello-service/index",
                String.class).getBody();
        long endTime = System.currentTimeMillis();
        System.out.println("Spend Time : "+ (endTime - beginTime));
        return body;
    }

    public String helloFallback(){
        return "error";
    }
}

複製程式碼

 

原理分析

工作流程

  1. 建立 HystrixCommand 或 HystrixObservableCommand 物件

  首先,建立一個 HystrixCommand 或 HystrixObservableCommand 物件,用來表示對依賴服務的操作請求,同時傳遞所有需要的引數。從其命名中我們就能知道它採用了“命令模式” 來實現服務呼叫操作的封裝。而這兩個 Command 物件分別針對不同的應用場景。

  • HystrixCommand :用在依賴的服務返回單個操作結果的時候。
  • HystrixObservableCommand :用在依賴的服務返回多個操作結果的時候。

  命令模式,將來自客戶端的請求封裝成一個物件,從而讓你可以使用不同的請求對客戶端進行引數化。它可以被用於實現“行為請求者” 與 “行為實現者” 的解耦,以便使兩者可以適應變化。下面的示例是對命令模式的簡單實現:

複製程式碼

package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

// 接收者
public class Receiver {
    public void active(){
        //真正的業務邏輯
        System.out.println("測試命令模式");
    }
}

複製程式碼

複製程式碼

package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */
//抽象命令
public interface Command {
    void excute();
}

複製程式碼

複製程式碼

package com.example.demo.command;

import org.springframework.beans.factory.annotation.Autowired;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

//具體命令實現
public class CommandImpl implements Command {
    private Receiver receiver;

    public CommandImpl(Receiver receiver) {
        this.receiver = receiver;
    }

    @Override
    public void excute() {
        this.receiver.active();
    }
}

複製程式碼

複製程式碼

package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

//客戶端呼叫
public class Invoker {

    private Command command;

    public void setCommand(Command command) {
        this.command = command;
    }

    public void  active (){
        command.excute();
    }
}

複製程式碼

複製程式碼

package com.example.demo.command;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

public class Client {
    public static void main(String[] args) {
        Receiver receiver = new Receiver();
        Command command = new CommandImpl(receiver);
        Invoker invoker = new Invoker();
        invoker.setCommand(command);
        invoker.active(); //客戶端通過呼叫者來執行命令
    }
}

複製程式碼

  從程式碼中,可以看到這樣幾個物件。

  • Receiver:接收者,它知道如何處理具體的業務邏輯。
  • Command:抽象命令,它定義了一個命令物件應具備的一系列命令操作,比如 execute 等。當命令操作被呼叫的時候就會觸發接收者去做具體命令對應的業務邏輯。
  • CommandImpl:具體的命令實現,在這裡它綁定了命令操作與接收者之間的關係,execute 命令的實現委託給了 Receiver 的 action 函式。
  • Invoker:呼叫者,它持有一個命令物件,並且可以在需要的時候通過命令物件完成具體的業務邏輯。

  從上面的示例中,我們可以看到,呼叫者 Invoker 與操作者 Receiver 通過 Command 命令介面實現瞭解耦。對於呼叫者來說,我們可以為其注入多個命令操作,呼叫者只需在需要的時候直接呼叫即可,而不需要知道這些操作命令實際是如何實現的。而在這裡所提到的 HystrixCommand 和 HystrixObservableCommand 則是在 Hystrix 中對 Command 的進一步抽象定義。

  2. 命令執行

  命令執行方式一共有4種,而 Hystrix 在執行時會根據建立的Command物件以及具體的情況來選擇一種執行。其中 HystrixCommand 實現了下面兩個執行方式。

  • execute():同步執行,從依賴的服務返回一個單一的結果物件,或是在發生錯誤的時候丟擲異常。
  • queue():非同步執行,直接返回一個 Future 物件,其中包含了服務執行結束時要返回的單一結果物件。
R execute();

Future<R> queue();

  而 HystrixObservableCommand 實現了另外兩種執行方式。

  • observe():返回 Observable 物件,它代表了操作的多個結果,它是一個 HotObservable。
  • toObservable():同樣返回 Observable 物件,也代表了操作的多個結果,但它返回的是一個 Cold Observable。
Observable<R> observe();

Observable<R> toObservable();

  在 Hystrix 的底層實現中大量使用了 RxJava ,為了更容易的理解後續內容,在這裡對 RxJava 的觀察者-訂閱者模式做一個簡單的入門介紹。

  上面提到的 Observable 物件就是 RxJava 中的核心內容之一,可以理解為 “事件源” 或者 “被觀察者”,與其對應的 Subscriber 物件,可以理解為 “訂閱者” 或者 “觀察者”。這兩個物件是 RxJava 響應式程式設計的重要組成部分。

  • Observable 用來向訂閱者 Subscriber 物件釋出事件,Subscriber 物件則在接收到事件後對其進行處理,而在這裡所指的事件通常就是對依賴服務的呼叫。
  • 一個 Observable 可以發出多個事件,知道結束或者發生異常。
  • Observable 物件每發出一個事件,就會呼叫對應觀察者 Subscriber 物件的 onNext() 方法。
  • 每一個 Observable 的執行,最後一定會通過呼叫 Subscriber.onCompleted() 或者 Subscriber.onError() 來結束該事件的操作流。

  下面通過一個簡單的例子來直觀理解一下 Observable 與 Subscribers:

複製程式碼

package com.example.demo.Observable_Subsciber;

import rx.Observable;
import rx.Subscriber;

/**
 * @author lxx
 * @version V1.0.0
 * @date 2017-8-16
 */

public class Obs_Subs {

    public static void main(String[] args) {

        //建立事件源
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello RxJava ");
                subscriber.onNext("I'm XX");
                subscriber.onCompleted();
            }
        });

        //建立訂閱者
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String s) {

            }
        };

        observable.subscribe(subscriber);

    }
}

複製程式碼

  在該示例中,建立了一個簡單的事件源 observable,一個對事件傳遞內容輸出的訂閱者 subscriber ,通過 observable.subscribe(subscriber) 來觸發事件的釋出。

  在這裡我們對於事件源 observable 提到了兩個不同的概念:Hot Observable 和 Cold Observable ,分別對應了上面的 command.observe() 和 command.toObservable() 的返回物件。其中 HotObservable,不論 “事件源” 是否有 “訂閱者” ,都會在建立後對事件進行釋出,所以對於 Hot Observable 的每一個 “訂閱者” 都有可能是從 “事件源” 的中途開始的,並可能只是看到了整個操作的區域性過程。而 Cold Observable 在沒有 “訂閱者” 的時候並不會釋出事件,而是進行等待,直到有 “訂閱者” 之後才釋出事件,所以對於 Cold Observable 的訂閱者,它可以保證從一開始看到整個操作的全部過程。

  3. 結果是否被快取

  若當前命令的請求快取功能是被啟用的,並且該命令快取命中,那麼快取的結果會立即以 Observable 物件的形式返回。

  4. 斷路器是否開啟

  在命令結果沒有快取命中的時候,Hystrix 在執行命令前需要檢查斷路器是否為開啟狀態:

  • 開啟:Hystrix不執行命令,轉到 fallback 處理邏輯(對應下面第8步)。
  • 關閉:Hystrix 跳到第5步,檢查是否有可用資源來執行命令。

  5. 執行緒池 / 請求佇列 / 資訊量是否佔滿

  如果與命令相關的執行緒池 / 請求佇列 / 資訊量已經佔滿,那麼 Hystrix 不會執行命令,跳轉到 fallback 處理邏輯(對應下面第8步)。

  注意:此處的執行緒池並非容器的執行緒池,而是每個依賴服務的專有執行緒池。Hystrix 為了保證不會因為某個依賴服務的問題影響到其他依賴服務而採用了 “艙壁模式” 來隔離每個依賴的服務。

  6. HystrixObservableCommand.construct() 或 HystrixCommand.run() 

  Hystrix 會根據編寫的方法來決定採取什麼樣的方式去請求依賴服務。

  • HystrixCommand.run() :返回一個單一的結果,或者丟擲異常。
  • HystrixObservableCommand.construct():返回一個 Observable 物件來發射多個結果,或通過 onError 傳送錯誤通知。

  如果 run() 或 construct() 方法的執行時間超過了命令設定的超時閾值,當前處理執行緒會丟擲 TimeoutException。這種情況下,也會跳轉到 fallback 處理邏輯(第8步)。

   7. 計算斷路器的健康度

  Hystrix 會將 “成功”、“失敗”、“拒絕”、“超時” 等資訊報告給斷路器,而斷路器會維護一組計數器來統計這些資料。

  斷路器會使用這些統計資料來決定是否要將斷路器開啟,來對某個依賴服務的請求進行 “熔斷 / 短路”,直到恢復期結束。

  8. fallback 處理

  當命令執行失敗的時候,Hystrix 會進入 fallback 嘗試回退處理,我們通常也稱為 “服務降級”。下面就是能夠引發服務降級處理的幾種情況:

  • 第4步,當前命令處於 “熔斷 / 短路” 狀態,斷路器是開啟的時候。
  • 第5步,當前命令的執行緒池、請求佇列或者訊號量被佔滿的時候。
  • 第6步,HystrixObservableCommand.construct() 或者 HystrixCommand.run() 丟擲異常的時候。

  9、返回成功的響應

  

斷路器原理

  HystrixCircuitBreaker 的定義:

複製程式碼

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.netflix.hystrix;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscriber;
import rx.Subscription;

public interface HystrixCircuitBreaker {
    boolean allowRequest();

    boolean isOpen();

    void markSuccess();

    void markNonSuccess();

    boolean attemptExecution();

    public static class NoOpCircuitBreaker implements HystrixCircuitBreaker {
        public NoOpCircuitBreaker() {
        }

        public boolean allowRequest() {
            return true;
        }

        public boolean isOpen() {
            return false;
        }

        public void markSuccess() {
        }

        public void markNonSuccess() {
        }

        public boolean attemptExecution() {
            return true;
        }
    }

    public static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;
        private final AtomicReference<HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status> status;
        private final AtomicLong circuitOpened;
        private final AtomicReference<Subscription> activeSubscription;

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.status = new AtomicReference(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED);
            this.circuitOpened = new AtomicLong(-1L);
            this.activeSubscription = new AtomicReference((Object)null);
            this.properties = properties;
            this.metrics = metrics;
            Subscription s = this.subscribeToStream();
            this.activeSubscription.set(s);
        }

        private Subscription subscribeToStream() {
            return this.metrics.getHealthCountsStream().observe().subscribe(new Subscriber() {
                public void onCompleted() {
                }

                public void onError(Throwable e) {
                }

                public void onNext(HealthCounts hc) {
                    if(hc.getTotalRequests() >= (long)((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerRequestVolumeThreshold().get()).intValue() && hc.getErrorPercentage() >= ((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerErrorThresholdPercentage().get()).intValue() && HystrixCircuitBreakerImpl.this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
                        HystrixCircuitBreakerImpl.this.circuitOpened.set(System.currentTimeMillis());
                    }

                }
            });
        }

        public void markSuccess() {
            if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED)) {
                this.metrics.resetStream();
                Subscription previousSubscription = (Subscription)this.activeSubscription.get();
                if(previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }

                Subscription newSubscription = this.subscribeToStream();
                this.activeSubscription.set(newSubscription);
                this.circuitOpened.set(-1L);
            }

        }

        public void markNonSuccess() {
            if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
                this.circuitOpened.set(System.currentTimeMillis());
            }

        }

        public boolean isOpen() {
            return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L);
        }

        public boolean allowRequest() {
            return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow())));
        }

        private boolean isAfterSleepWindow() {
            long circuitOpenTime = this.circuitOpened.get();
            long currentTime = System.currentTimeMillis();
            long sleepWindowTime = (long)((Integer)this.properties.circuitBreakerSleepWindowInMilliseconds().get()).intValue();
            return currentTime > circuitOpenTime + sleepWindowTime;
        }

        public boolean attemptExecution() {
            return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(this.isAfterSleepWindow()?this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN):false)));
        }

        static enum Status {
            CLOSED,
            OPEN,
            HALF_OPEN;

            private Status() {
            }
        }
    }

    public static class Factory {
        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap();

        public Factory() {
        }

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            HystrixCircuitBreaker previouslyCached = (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name());
            if(previouslyCached != null) {
                return previouslyCached;
            } else {
                HystrixCircuitBreaker cbForCommand = (HystrixCircuitBreaker)circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreaker.HystrixCircuitBreakerImpl(key, group, properties, metrics));
                return cbForCommand == null?(HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()):cbForCommand;
            }
        }

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name());
        }

        static void reset() {
            circuitBreakersByCommand.clear();
        }
    }
}

複製程式碼

  主要定義了三個斷路器的抽象方法。

  • allowRequest:Hystrix 命令的請求通過它判斷是否被執行。
  • isOpen:返回當前斷路器是否開啟。
  • markSuccess:用來閉合斷路器。

  另外還有三個靜態類。

  • 靜態類 Factory 中維護了一個 Hystrix 命令與 HystrixCircuitBreaker 的關係集合:ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand,其中 String 型別的key 通過 HystrixCommandKey 定義,每一個 Hystrix 命令需要有一個 key 來標識,同時一個 Hystrix 命令也會在該集合中找到它對應的斷路器 HystrixCircuitBreaker 例項。
  • 靜態類 NoOpCircuitBreaker 定義了一個什麼都不做的斷路器實現,它允許所有請求,並且斷路器狀態始終閉合。
  • 靜態類 HystrixCircuitBreakerImpl 是斷路器介面 HystrixCIrcuitBreaker 的實現類,在該類中定義斷路器的 4 個核心物件。  
    • HystrixCommandProperties properties :斷路器對應 HystrixCommand 例項的屬性物件。
    • HystrixCommandMetrics metrics :用來讓 HystrixCommand 記錄各類度量指標的物件。
    • AtomicLong circuitOpened :斷路器開啟或是上一次測試的事件戳。

  HystrixCircuitBreakerImpl 的各個實現方法如下:

  • isOpen:判斷斷路器的開啟 / 關閉狀態。
 public boolean isOpen() {
  return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L);
 }
  • allowRequest:判斷請求是否被允許:

複製程式碼

 public boolean allowRequest() {
  return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow())));
 }

複製程式碼

  • markSuccess:“半開路” 狀態時使用。若Hystrix命令呼叫成功,通過該方法將開啟的斷路器關閉,並重置度量指標物件。

複製程式碼

 public void markNonSuccess() {
  if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) {
    this.circuitOpened.set(System.currentTimeMillis());
  }
}

複製程式碼

 

依賴隔離

  Hystrix 使用 “艙壁模式” 實現執行緒池的隔離,它為每一個依賴服務建立一個獨立的執行緒池,就算某個依賴服務出現延遲過高的情況,也不會拖慢其他的依賴服務。

 

使用詳解

建立請求命令

  Hystrix 命令就是我們之前所說的 HystrixCommand,它用來封裝具體的依賴服務呼叫邏輯。

  可以通過繼承的方式來實現,比如: