1. 程式人生 > >Spring Cloud Netflix Hystrix介紹和使用

Spring Cloud Netflix Hystrix介紹和使用

前面我們搭建了具有服務降級功能的Hystrix客戶端,現在我們來詳細瞭解下Hystrix的一些功能。

Hystrix的意思是豪豬,大家都知道,就是長滿刺的豬。。。實際上,它表明了該框架的主要功能:自我保護功能。Hystrix具有服務降級,熔斷,執行緒池隔離,訊號量隔離,快取等功能,基本上能覆蓋到微服務中呼叫依賴服務會遇到的問題。下面我們介紹下,如何理解和使用這些功能。

1、最常用的的服務降級功能

  當執行呼叫服務方法時,若呼叫方法出現問題,如:請求超時,丟擲異常,執行緒池拒絕,熔斷這些情況下,為該方法定義降級方法,以便在出現問題時執行,實現備用返回。之前我們已經實現了服務降級功能,主要就是通過@HystrixCommand(fallbackMethod = "defaultMethod")註釋到需要在出現問題時降級的方法。fallbackMethod指定降級後執行的方法。方法定義在該類中,public,private,protected都可以。在註釋的方法出問題後,如超時未返回(execution.isolation.thread.timeoutinMilliseconds來配置),就會執行備用方法,返回備用方法的返回值。當然,降級的方法也可以定義再下一級的降級方法,實現和上面一樣。

  上面說到方法丟擲異常也會觸發服務降級,但是如果我們自定義了異常,並需要將異常丟擲給上層做操作,不希望Hystrix捕捉到自定義異常執行服務降級時,可以使用@HystrixCommand(ignoreExceptions = {MyException.class})來定義忽略捕捉的異常。多個異常用逗號隔開。也可以將丟擲的異常通過入參傳到降級的方法,來實現不同型別異常的不同處理,需要將降級方法定義如下。

@HystrixCommand(fallbackMethod = "back")
    public String getHello(String id)
    {
        return
template.getForObject("http://helloclient/hello", String.class); } public String back(String id , Throwable e) { if (e instanceof NullPointerException) { return "client 2 has some error! NullPointerException"; } else { return
"client 2 has some error! Exception"; } }

 2、熔斷器

  熔斷器,和物理概念中的斷路器類似,斷路器在高壓高溫的過載情況下,會自動斷開,實現對電路的保護。熔斷器也是一樣,下面我們看下主要的介面類:HystrixCircuitBreaker.java,它定義了以下幾個方法,並有兩個內部實現類HystrixCircuitBreakerImpl,NoOpCircuitBreaker,斷路器主要用到HystrixCircuitBreakerImpl。NoOpCircuitBreaker這個類表明不做任何操作,預設熔斷器不開啟,表明不起用熔斷功能。以下的實現方法,都是指HystrixCircuitBreakerImpl的實現。熔斷器有三個狀態,OPEN,HALF_OPEN,CLOSED,如果要自定義引數配置,下面程式碼註釋中可以找到。

/**
     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does
     * not modify any internal state, and takes into account the half-open logic which allows some requests through
     * after the circuit has been opened
     * 
     * @return boolean whether a request should be permitted
     */
    boolean allowRequest();

    /**
     * Whether the circuit is currently open (tripped).
     * 
     * @return boolean state of circuit breaker
     */
    boolean isOpen();

    /**
     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     */
    void markSuccess();

    /**
     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     */
    void markNonSuccess();

    /**
     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal
     * state.
     */
    boolean attemptExecution();

(1) isOpen()方法用於判斷熔斷器是否開啟。實現方法如下:

 @Override
        public boolean isOpen() {
           //判斷熔斷器是否被強制開啟,如果強制開啟,返回true,表示熔斷器已開啟。circuitBreaker.forceOpen這個配置指定
            if (properties.circuitBreakerForceOpen().get()) {
                return true;
            }
          //判斷熔斷器是否被強制關閉。circuitBreaker.forceClosed
            if (properties.circuitBreakerForceClosed().get()) {
                return false;
            }
           //判斷上一次斷路器開啟的時間是否大於零,訪問成功,該值為-1,訪問失敗,該值為訪問失敗時的系統時間。根據是否大於零,判斷熔斷器是否開啟。
            return circuitOpened.get() >= 0;
        }

(2) attemptExecution(),該方法會在熔斷器開啟的時候,有訪問時,熔斷器第一個執行的方法。如果返回false,則直接執行fallback降級方法。

@Override
        public boolean attemptExecution() {
           //判斷熔斷器是否被強制開啟,如果強制開啟,返回false後,直接執行fallback
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
          //判斷熔斷器是否被強制關閉
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
          //如果circuitOpened為-1,返回true,正常執行
            if (circuitOpened.get() == -1) {
                return true;
            } else {
              //如果circuitOpened不為-1,則表示斷路器打開了,此時,服務會從circuitOpened起,休眠5秒(circuitBreaker.sleepWindowInMilliseconds配置,              //預設5000),直接返回false,執行fallback。若休眠時間超過5秒,並且當前熔斷狀態為開啟狀態,則會將熔斷狀態置為半開狀態。如它的註釋,只有第一個              //請求滿足第一次為開啟,之後的請求都為半開狀態,返回false。
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }

(3)markSuccess(),在執行完attemptExecution()返回true正常執行成功後(未fallback),才會執行該方法,標註成功,若之前斷路器為關閉狀態,則不做處理,若為半開狀態,則重置熔斷器。

 @Override
        public void markSuccess() {
           //如果當前狀態為半開,則將state設定成closed,關閉熔斷器。如果之前由於斷路器開啟時,之後的請求,Hystrix會放開一個請求去嘗試是否服務正常,並將斷路器置為半開,           //如果正常,則將斷路器關閉,並重置斷路器。
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                metrics.resetStream();
                Subscription previousSubscription = activeSubscription.get();
                if (previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }
                Subscription newSubscription = subscribeToStream();
                activeSubscription.set(newSubscription);
                circuitOpened.set(-1L);
            }
        }

(4) markNonSuccess(),用來在正常請求下,請求失敗後呼叫。

 @Override
        public void markNonSuccess() {
           //如果當前為半開狀態,且請求失敗,則重新開啟斷路器,將最近一次訪問失敗的時間置為當前時間。
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                circuitOpened.set(System.currentTimeMillis());
            }
        }

(5) 熔斷器的開啟。上面的方法都不會去開啟熔斷器,熔斷器開啟是由另一個方法去判斷的。這個觀察者的方法應該是週期執行的。

 private Subscription subscribeToStream() {
            /*
             * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
             */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                           //檢查時間窗內的請求總數小於配置檔案中的數量(採用的是buckets,感興趣的自己研究下)。預設時間窗為10S(metrics.rollingStats.timeInMilliseconds,metrics.rollingStats.numBuckets),預設請求總數為20(circuitBreaker.requestVolumeThreshold)。
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                //時間窗內,統計的錯誤(失敗)請求比例是否小於配置比例,預設配置是50%,通過circuitBreaker.errorThresholdPercentage=50指定。
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                   //如果時間窗內請求數大於定義數,且失敗比例大於定義比例,並且當前熔斷器關閉的情況下,將熔斷器置為開啟,並將circuitOpened置為當前時間。
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }

(6) 過程:先文字敲吧,沒畫圖工具。

  正常情況:請求——>subscribeToStream未開啟熔斷器——>attemptExecution——>markSuccess

  異常情況:請求——>subscribeToStream開啟熔斷器——>attemptExecution最後一個return返回false——>markNonSuccess,這個時候斷路器開啟狀態,且在休眠時間窗內。

       請求——>subscribeToStream未處理——>attemptExecution在超過休眠時間窗後,放開一個請求,並把熔斷器設定成半開——>請求成功,執行markSuccess,將熔斷器從半開置為關閉,並重置熔斷器;請求失敗,則將半開狀態置為開啟狀態,失敗時間起點重置成當前時間,再次迴圈。

3、快取

  之前我以為是每次相同請求,會使用快取直接返回。其實理解錯了,Hystrix的快取是在當次請求的快取,當次請求中,多次使用同一方法時,會使用快取。其他請求不能用到。而且還需初始化HystrixRequestContext,不然直接使用會報錯,我們採用定義filter來初始化。不多說了,貼程式碼大家看下,程式碼中註釋很清楚,啟動註冊中心和服務例項後(環境搭建見之前章節),就可以測試。

(1)pom.xml,application.yml配置,大家參見之前的章節。

(2)啟動類,注意註解上@ServletComponentScan。

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@EnableCircuitBreaker
@SpringBootApplication
@EnableEurekaClient
@ServletComponentScan
public class ConsumerApplication {

    @Bean
    @LoadBalanced
    RestTemplate template()
    {
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

(3)Filter類,用於初始化HystrixRequestContext。

package com.example.demo;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import java.io.IOException;

@WebFilter(filterName = "HystrixRequestContextServletFilter",urlPatterns = "/*",asyncSupported = true)
public class HystrixRequestContextServletFilter implements Filter {

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();

        try
        {
            chain.doFilter(request,response);
        }
        finally {
            context.shutdown();
        }
    }

    @Override
    public void destroy() {

    }
}

(4)controller類。

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerContorller {

    @Autowired
    HystrixServer server;

    //注意,在這個controller中呼叫具有快取功能的方法才會具備快取效果。
    @RequestMapping("/hello")
    public String sayHello()
    {
        System.out.println("請求了一次hello2");
        server.getHello2("1","ibethfy");
        System.out.println("請求了二次hello2,不會列印hello2 initinized");
        server.getHello2("1","ibethfy");
        System.out.println("請求了三次hello2,清空快取,會列印hello2 initinized");
        server.updateHello2("1","ibethfy");
        server.getHello2("1","ibethfy");
        System.out.println("請求了四次hello2,入參不同,會列印hello2 initinized");
        server.getHello2("1","ibethfy1");
        return server.getHello2("1","ibethfy1");
    }
}

(5)server類。

package com.example.demo;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheKey;
import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheRemove;
import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheResult;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class HystrixServer {

    @Autowired
    RestTemplate template;

    //通過指定生成快取key的方法生成key,commandKey指定一個HystrixCommand的key,表示註解@HystrixCommand的方法的key。groupKey表示一個型別分組的key。threadPoolKey指定執行緒池的key。
    //fallbackMethod指定降級方法,commandProperties指定該HystrixCommand方法的引數,是個陣列型別,裡面的值是@HystrixProperty,多個用逗號隔開。
    @CacheResult(cacheKeyMethod = "generateCacheKey")
    @HystrixCommand(commandKey = "getHello1",groupKey = "getHello",threadPoolKey = "getHelloThreadPool",fallbackMethod = "back",commandProperties = {
            @HystrixProperty(name="execution.isolation.thread.timeoutinMilliseconds", value = "5000")
    })
    public String getHello1()
    {
        System.out.println("hello1 initinized");
        return template.getForObject("http://helloclient/hello", String.class);
    }

    private String generateCacheKey()
    {
        return "myHelloKey";
    }


    //若不指定cache的key,預設使用方法的所有引數作為key
    @CacheResult
    @HystrixCommand(commandKey = "getHello2",groupKey = "getHello",threadPoolKey = "getHelloThreadPool")
    public String getHello2(String id,String name)
    {
        System.out.println("hello2 initinized");
        return template.getForObject("http://helloclient/hello", String.class);
    }

    //使用@CacheRemove在資料更新時,移除對應key的快取,需要指定commandKey,@HystrixCommand裡面的引數可以指定亦可以不用
    @CacheRemove(commandKey = "getHello2")
    @HystrixCommand(commandKey = "getHello2",groupKey = "getHello",threadPoolKey = "getHelloThreadPool")
    public void updateHello2(String id,String name)
    {
        System.out.println("hello2 id = "+ id + ", name = "+ name + " removed");
    }

    //使用@CacheKey指定引數作為key
    @CacheResult
    @HystrixCommand(commandKey = "getHello3",groupKey = "getHello",threadPoolKey = "getHelloThreadPool")
    public String getHello3(@CacheKey("id") String id, String name)
    {
        System.out.println("請求了一次hello3");
        return "hello3 " + id + name;
    }

    public String back(Throwable e)
    {
        if (e instanceof NullPointerException)
        {
            return "client 2 has some error! NullPointerException";
        }
        else
        {
            return "client 2 has some error! Exception";
        }
    }

}

4、執行緒隔離和訊號量隔離。

  Hystrix為了避免多個不同服務間的呼叫影響,使用了執行緒隔離模式,它為每個依賴服務單獨建立自己的執行緒池,就算某個服務延遲或問題阻塞,其餘的服務也能正常執行。總之,使得我們的服務更加健壯。當然,建立這麼多執行緒池,可能會對效能造成影響,但Hystrix測試後,獨立執行緒池帶來的好處,遠大於效能損耗的壞處。所以,大家可以放心使用。

  ExecutionIsolationStrategy列舉中定義了兩個THREAD, SEMAPHORE,一個是執行緒池,一個是訊號量,Hystix預設使用執行緒池。通過execution.isolation.strategy可以切換。

  Hystrix預設情況下,會讓配置了同組名的groupKey的command使用同一執行緒池,但也支援使用threadPoolKey配置執行緒池key。

  對於那些本來延遲就比較小的請求(例如訪問本地快取成功率很高的請求)來說,執行緒池帶來的開銷是非常高的,這時,可以考慮採用非阻塞訊號量(不支援超時),來實現依賴服務的隔離,使用訊號量的開銷很小。但絕大多數情況下,Netflix 更偏向於使用執行緒池來隔離依賴服務,因為其帶來的額外開銷可以接受,並且能支援包括超時在內的所有功能。

好了,Hystrix的主要功能基本介紹完了,碼字不容易呀,,,,