1. 程式人生 > >Spring Cloud Hystrix(服務容錯保護)(2)

Spring Cloud Hystrix(服務容錯保護)(2)

1.建立請求命令

Hystrix命令(HystrixCommand)它用來封裝具體的依賴服務呼叫邏輯。

我們可以用繼承的方式實現:

public class HelloCommand extends HystrixCommand<String> {

    private RestTemplate restTemplate;

    private Long id;

    public HelloCommand(Setter setter,RestTemplate restTemplate,long id){
        super(setter);
        this.restTemplate=restTemplate;
        this.id=id;
    }

    @Override
    protected String run() throws Exception {
      return restTemplate.getForEntity("http://hello-service/hello", String.class).getBody();
    }
}

通過上面實現的HelloCommand我們既可以同步執行也可以非同步執行。

同步執行:String s= new HelloCommand(restTemplate,1L).execute();

非同步執行:Future<String> future=new HelloCommand(restTemplate,1L).queue();非同步執行時,可以通過對返回的future呼叫get方法來獲取結果。

另外可以用@HystrixCommand註解阿里實現Hystrix命令的定義。

@Service
public class HelloService {

    private final Logger logger=Logger.getLogger(getClass());

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String helloService() {
        long start=System.currentTimeMillis();
        ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);
        String body = responseEntity.getBody();
        long end=System.currentTimeMillis();
        logger.info("消耗時間:"+(start-end));
        return body;
    }

    @HystrixCommand(fallbackMethod = "helloFallback")
    public Future<String> helloServiceFuture() {
       return new AsyncResult<String>(){
           public String invoke(){
               System.out.println("future當前時間"+new Date(System.currentTimeMillis()));
               return restTemplate.getForObject("http://hello-service/hello",String.class);
           }
       };
    }

    public String helloFallback() {
        return "error";
    }
}

這裡分別用上了同步和非同步兩種方式,除了傳統的同步和非同步執行外,還可以將HystrixCommand通過Observable來實現響應式執行方式。通過呼叫observe()和toObservable()方法可以返回Observable物件:

Observable<String> ho=new HelloCommand(restTemplate,1L).observe();
Observable<String> co=new HelloCommand(restTemplate,1L).toObservable();

前者返回的是Hot Observable該命令會在observe()呼叫的時候立即執行,當Observable每次訂閱的時候會重放它的行為,後者返回的是一個Cold Observable,toObservable()執行後命令不會立即執行,只有當所有訂閱者都訂閱它才會執行。

而使用@HystrixCommand來實現的方式是:

 @HystrixCommand(fallbackMethod = "helloFallback")
    public Observable<String> helloServiceObservable() {
        return  Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if (!subscriber.isUnsubscribed()){
                        String s=restTemplate.getForObject("http://hello-service/hello",String.class);
                        subscriber.onNext(s);
                        subscriber.onCompleted();
                    }
                }catch (Exception e){
                    subscriber.onError(e);
                }
            }
        });
    }

在使用@HystrixCommand註解實現響應式命令時,可以通過observableExecutionMode引數來控制是使用observe()還是toObservable()的執行方式:

@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER)
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY)

EAGER表示使用observe()執行方式。

LAZY表示使用toObservable()執行方式。

2.定義服務降級

fallback是Hystrix命令執行失敗時使用的後備方法,用來實現服務的降級處理邏輯。而在HystrixCommand中可以通過過載getFallback()方法來實現服務的降級處理,Hystrix會在run()執行中出現錯誤,超時,執行緒池拒絕,斷路器熔斷等情況,執行getFallback()方法內的邏輯可以實現服務降級邏輯。

在HystrixObservableCommand實現的Hystrix命令中,我們可以通過過載resumeWithFallback方法來實現服務降級邏輯。該方法會返回一個Observable物件,當命令執行失敗時,Hystrix會將Observable中的結果通知給所有訂閱者。

當通過註解的形式進行服務降級只需要使用@HystrixCommand中的fallbackMethod引數來指定服務降級方法:

 public String helloFallback() {
        return "error";
    }

   @HystrixCommand(fallbackMethod = "helloFallback")
    public Future<String> helloServiceFuture() {
       ……
    }

當使用註解來定義服務降級時,我們需要將具體的Hystrix命令與fallback實現函式定義在同一個類中,並且fallback方法的名字相同。由於必須定義在一個類中,所以對於fallback的訪問修飾符沒有特定要求。

在實際使用時,我們需要為大多數執行過程中可能會失敗的Hystrix命令進行服務降級邏輯,但是也有些情況不需要去實現降級邏輯。

1.執行寫操作的命令:當Hystrix命令是用來執行寫操作的時候,實現服務降級的意義並不是很大。當寫入失敗時,這時候通常只需要通知呼叫者即可。

2.執行批處理或離線計算時:當Hystrix命令是用來執行批處理程式生成一份報告或是進行任何型別的離線計算時,那麼通常這些操作只需要將錯誤傳播給呼叫者,讓呼叫者稍後重試而不是進行降級處理響應。

3.異常處理

3.1異常傳播

在HystrixCommand實現的run()方法中跑出異常時,除了HystrixBadRequestException之外,其他異常均會被Hystrix認為命令執行失敗並觸發服務降級的處理邏輯。

而在註解配置實現Hystrix命令時,它還支援忽略指定異常型別的功能,只需要通過設定@HystrixCommand註解的ignoreException引數,比如:

@HystrixCommand(fallbackMethod = "helloFallback",ignoreExceptions = {NullPointerException.class})

這條註釋定義了當方法丟擲空指標異常時,Hystrix會將其包裝在HystrixBadRequestException中,這樣就不會觸發後續的fallback邏輯。

3.2異常獲取

當Hystrix命令因為異常(除了HystrixBadRequestException的異常)進入服務降級邏輯後需要對其不同的異常進行鍼對性處理。

在以繼承方式實現Hystrix命令中,我們可以用getFallback()方法通過Throwable getExecutionException()方法來獲取具體的異常,通過判斷來進入不同的處理邏輯。

而通過註解的方式來獲取異常可以使用該種方式:

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String exception() {
        System.out.println("exception");
        if(1==1)
            throw new RuntimeException("command failed!!!!!");
        return "hello";
    }

    public String helloFallback(Throwable e) {
        System.out.println(e.getMessage());
        assert "command failed".equals(e.getMessage());
        System.out.println(e.getMessage());
        return "error";
    }

(如果你用debug模式需要注意如果長時間未到降級服務方法,它會變成其他異常資訊。。。這樣就不能獲取RuntimeException異常。)

4.命令名稱、分組以及執行緒池劃分

當使用@HystrixCommand註解的時候設定命令名、分組以及執行緒劃分,只需要設定註解的commandKey、groupKey以及threadPoolKey屬性即可,如:

 

@HystrixCommand(commandKey = "idKey",groupKey = "Group",threadPoolKey = "Thread")

 

當使用繼承的方式來實現命令組名等資訊時,可以用如下的方式:這裡通過withGroupKey來設定命令組,然後呼叫andCommandKey來設定命令名。因為Setter定義中,只有withGroupKey靜態函式可以建立Setter的例項,所以GroupKey是每個Setter必須的引數,而CommandKey則是可選的引數。而HystrixThreadPoolKey來對執行緒池來進行設定,通過它我們可以實現更細粒度的執行緒池劃分。

    public HelloCommand() {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupKey")).
                andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey")).andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));
    }

 

5.請求快取

Hystrix請求快取的使用非常簡單,我們只需要在實現HystrixCommand或HystrixObservableCommand時,通過過載getCacheKey()方法開啟請求快取。

package com.example.eureka.ribbon.dao;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixThreadPoolKey;
import org.springframework.web.client.RestTemplate;

public class HelloCommand extends HystrixCommand<String> {

    private RestTemplate restTemplate;

    private Long id;

    public HelloCommand(Setter setter, RestTemplate restTemplate, long id) {
        super(setter);
        this.restTemplate = restTemplate;
        this.id = id;
    }

    public HelloCommand( RestTemplate restTemplate, long id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group")));
        this.restTemplate = restTemplate;
        this.id = id;
    }

    public HelloCommand() {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupKey")).
                andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey")).andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));
    }


    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://hello-service/hello", String.class).getBody();
    }
    
    protected String getCacheKey(){
        return String.valueOf(id);
    }


}

這裡通過getCacheKey方法中的返回的請求快取key值,就能讓該請求命令具備快取功能。通過開啟請求快取可以讓我們實現的Hystrix命令具備下面的幾項好處:

1.減少重複請求數,降低依賴服務的併發度。

2.在同一使用者請求的上下文中,相同依賴服務的返回資料始終保持一致。

3.請求快取在run()和construct()執行之前生效,所以可以有效減少不必要的執行緒開銷。

清理失效快取功能

在Hystrix中,我們可以通過HystrixRequestCache.clear()方法來進行快取的清理:

public class HelloCommand extends HystrixCommand<String> {

    private static final HystrixCommandKey GETTER_KEY=HystrixCommandKey.Factory.asKey("CommandKey");
    private RestTemplate restTemplate;
    private Long id;

    public HelloCommand(Setter setter, RestTemplate restTemplate, long id) {
        super(setter);
        this.restTemplate = restTemplate;
        this.id = id;
    }

    public HelloCommand( RestTemplate restTemplate, long id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group")).andCommandKey(GETTER_KEY));
        this.restTemplate = restTemplate;
        this.id = id;
    }

    public HelloCommand() {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupKey")).
                andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey")).
                andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));
    }


    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://hello-service/hello", String.class).getBody();
    }

    protected String getCacheKey(){
        return String.valueOf(id);
    }

    public static void flushCache(Long id){
        HystrixRequestCache.getInstance(GETTER_KEY,HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id));
    }
}

使用註解實現請求快取

註解 描述 屬性
@CacheResult
該註解用來標記請求命令返回的結果應該被快取,它必須與@HystrixCommand結合使用 cacheKeyMethod
@CacheRemove
該註解用來讓請求命令的快取失效,失效的快取根據定義的key決定 commandKey,cacheKeyMethod
@CacheKey
該註解用來在請求命令的引數上標記,使其作為快取的key值,如果沒有 標註則會使用所有引數。如果同時使用了@CacheResult和@CacheRemove註解的cacheKeyMethod方法指定快取Key的生成,那麼該註解將不會起作用 value

定義快取Key:當使用註解來定義請求快取時,如果要為請求命令指定具體的快取Key生成規則,我們可以使用@CacheResult和@CacheRemove註解的cacheKeyMethod方法來指定具體的生成函式;也可以通過使用@CacheKey註解在方法引數中指定用於組裝快取Key的元素。

使用cacheKeyMethod方法示例如下:

  @CacheResult(cacheKeyMethod = "CacheKey")
    @HystrixCommand(fallbackMethod = "helloFallbacks")
    public String helloService(int id) {
        long start=System.currentTimeMillis();
        ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);
        String body = responseEntity.getBody();
        long end=System.currentTimeMillis();
        logger.info("消耗時間:"+(start-end));
        return body;
    }

    private String CacheKey(int id){
        System.out.println(id);
        return String.valueOf(id);
    }

需要注意的是要在控制器中加入

HystrixRequestContext.initializeContext();

否則他會報:java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?

通過@CacheKey註解實現方式如下:

    @CacheResult
    @HystrixCommand(fallbackMethod = "helloFallbacks")
    public String helloService(@CacheKey("id") int id) {
        long start=System.currentTimeMillis();
        ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);
        String body = responseEntity.getBody();
        long end=System.currentTimeMillis();
        logger.info("消耗時間:"+(start-end));
        return body;
    }

@CacheKey("id"),會報java.beans.IntrospectionException: Method not found: isId,目前還不知道什麼原因。

快取清理:這裡通過@CacheRemove註解來進行失效快取的清理,比如下面的例子:

@CacheResult
@HystrixCommand(fallbackMethod = "helloFallbacks")
public String helloService(int id) {
    long start=System.currentTimeMillis();
    ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);
    String body = responseEntity.getBody();
    long end=System.currentTimeMillis();
    logger.info("消耗時間:"+(start-end));
    return body;
}

    @CacheRemove(commandKey = "helloService")
    @HystrixCommand(fallbackMethod = "helloFallbacks")
    public String updateCache(int id) {
        ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);
        String body = responseEntity.getBody();
        return body;
    }
    @RequestMapping("/ribbon-consumer")
    public String helloConsumer(int id) {
        System.out.println("同步:當前時間:" + new Date(System.currentTimeMillis()));
        String s = helloService.helloService(id);
        System.out.println("同步:當前時間:" + new Date(System.currentTimeMillis()) + "值:" + s);
        String ss = helloService.helloService(id);
        String sss= helloService.updateCache(id);
        System.out.println(s+"====\n"+ss+"=====\n"+sss);
        return s;
    }

需要注意的是@CacheRemove註解的commandKey屬性必須指定,它用來指明需要使用請求快取的請求命令,因為只有通過該屬性的配置Hystrix才能找到正確的請求命令快取的位置。

這裡我新加了一個過濾器程式碼:

@WebFilter(filterName = "hystrixContextServletFilter",urlPatterns = "/*",asyncSupported = true)
public class HystrixContextServletFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        System.out.println("+++++++++++++++++++ filter +++++++++++++++++++++++");
        HystrixRequestContext context=HystrixRequestContext.initializeContext();
        try {
            chain.doFilter(request,response);
        }finally {
            context.shutdown();
        }

    }

    @Override
    public void destroy() {

    }
}

並且在主類處開啟@ServletComponentScan註解:

@SpringCloudApplication
@ServletComponentScan
public class EurekaRibbonApplication {
    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(EurekaRibbonApplication.class, args);
    }
}

 

參考《Spring Cloud微服務實戰》