在微服務架構中,我們將系統拆分成很多個服務單元,各單位的應用間通過服務註冊與訂閱的方式相互依賴。由於每個單元都在不同的程序中執行,依賴通過遠端呼叫的方式執行,這樣就有可能因為網路原因或是依賴服務自身問題出現呼叫故障或延遲,而這些問題會直接導致呼叫方的對外服務也出現延遲,若此時呼叫方的請求不斷增加,最後就會因等待出現故障的依賴方響應形成任務積壓,最終導致自身服務的不可用。這樣的架構相對於傳統架構更加的不穩定,為了解決這樣的問題,就產生了斷路器等一系列的服務保護機制。而Spring Cloud Hystrix就是這樣的實現了服務降級、服務熔斷、執行緒和訊號量隔離、請求快取、請求合併以及服務監控等一系列服務保護功能的元件。
本篇文章還是在前兩篇文章的基礎上所作的:
SpringCloud專題之一:Eureka
Spring Cloud專題之二:OpenFeign
歡迎大家檢視!!!
先啟動需要的服務工程:
- EUREKA-SERVER:註冊中心,埠為9001
- HELLO-SERVER:提供服務的客戶端,埠為9002和9003
- EUREKA-CUSTOMER:消費服務的消費者端,埠為9004
在未加入Hystrix(斷路器)之前,如果我關閉掉一個客戶端,那麼使用消費者訪問的時候可以獲得如下的輸出:
因為feign的預設連線時間是1s,所以超過1s後就會報連線不上的錯。
Hystrix程式碼
由於 openfeign 包 預設集成了 hystrix,所以只需要開啟開關即可
#開啟Hystrix降級處理
feign.hystrix.enabled=true
引入jar包
<!--hystrix-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--hystrix-javanica-->
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
</dependency>
1.服務降級
降級是指當請求超時,資源不足等情況發生時,進行的服務降級處理,不呼叫真實服務邏輯,而是使用快速失敗的方式直接返回一個託底資料,保證服務鏈路的完整,避免服務雪崩。
降級的實現是指在呼叫遠端服務的方法上增加@HystrixCommand的註解,通過指定fallbackMethod的值設定失敗的回撥方法,也可以使用@FeignClient的方式指定fallback類。
1.在Customer1Feign類的@FeignClient註解上新增fallback的類
/**
* @className: Customer1Feign
* @description: 測試多個feign使用相同的name的問題
* @author: charon
* @create: 2021-06-06 09:42
*/
@FeignClient(value = "HELLO-SERVER",fallback = EurekaClientFallBack.class)
public interface Customer1Feign {
/**
* 要求:
* 必須要指定RequestParam屬性的value值,同時RequestMethod的method也需要指定
* 方法上新增SpringMVC的註解
* @return
*/
@RequestMapping(value = "/sayHello1",method = RequestMethod.GET)
String sayHello1(@RequestParam("name") String name);
}
3.編寫fallback使用的類:EurekaClientFallBack
/**
* @className: EurekaClientFallBack
* @description: 客戶端的降級實現類
* @author: charon
* @create: 2021-06-20 22:06
*/
@Component
public class EurekaClientFallBack implements Customer1Feign {
/**
* 日誌記錄類
*/
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* sayHello1介面的服務降級類
* @param name 引數
* @return
*/
@Override
public String sayHello1(String name) {
logger.error("您訪問了EurekaClientFallBack#sayHello1(),傳入的引數為:{}" , name);
return "您訪問了EurekaClientFallBack#sayHello1(),傳入的引數為:" + name;
}
}
然後消費者端再次呼叫介面,會發現頁面展示為如下圖,而不是之前的Whitelabel Error Page了。
到這裡,我們就實現了一個最簡單的斷路器功能了。
4.模擬實現提供服務的客戶端程式碼執行超時的情況:
@RestController
public class Hello1Controller {
/**
* 日誌記錄類
*/
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${server.port}")
private String host;
@Value("${spring.application.name}")
private String instanceName;
@RequestMapping("/sayHello1")
public String sayHello1(@RequestParam("name") String name){
try {
int sleepTime = new Random().nextInt(3000);
logger.error("讓執行緒阻塞 {} 毫秒",sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("你好,服務名:{},埠為:{},接收到的引數為:{}",instanceName,host,name);
return "你好,服務名:"+instanceName+",埠為:"+host+",接收到的引數為:"+name;
}
}
在HELLO-SERVER的工程中讓執行緒隨機sleep幾秒,然後消費者端呼叫,可以發現,當呼叫HELLO-SERVER超過1000毫秒時就會因為服務超時從而觸發熔斷請求,並呼叫回撥邏輯返回結果。
除了上面的方式外,還可以使用@HystrixCommand的註解來配置fallbackMethod方法(更靈活)。
@HystrixCommand(fallbackMethod = "sayHello1Fallback")
@Override
public String invokeSayHello1(String name) {
long startTime = System.currentTimeMillis();
String result = feign1.sayHello1(name);
logger.error("您訪問了CustomerServiceImpl#sayHello1(),執行時間為:{} 毫秒",System.currentTimeMillis() - startTime );
return result;
}
public String sayHello1Fallback(String name){
logger.error("出錯了,您訪問了CustomerServiceImpl#sayHello1Fallback()" );
return "出錯了,您訪問了CustomerServiceImpl#sayHello1Fallback()";
}
2.服務熔斷
熔斷就是當一定時間內,異常請求的比例(請求超時,網路故障,服務異常等)達到閾值,啟動熔斷器,熔斷器一旦啟動,則會停止呼叫具體的服務邏輯,通過fallback快速返回託底資料,保證服務鏈的完整。熔斷又自動恢復的機制,如:當熔斷器啟動後,每隔5秒嘗試將新的請求傳送給服務端,如果服務可正常執行並返回結果,則關閉熔斷器,服務恢復。如果仍然呼叫失敗,則繼續返回託底資料,熔斷器處於開啟狀態。
服務降級是指呼叫服務出錯了返回託底資料,而熔斷則是出錯後如果開啟了熔斷器將在一定的時間內不呼叫服務端。
熔斷的實現是指在呼叫遠端服務的方法上增加@HystrixCommand的註解。通過@HystrixProperty的name屬性指定需要配置的屬性(可以是字串,也可以使用HystrixPropertiesManager常量類的常量),通過value設定屬性的值,也可以通過setter方式設定屬性值。
/**
* 註解的配置意思為:當時間在20s內的10個請求中,當出現了30%(3個)的失敗,則觸發熔斷
* @param name 引數
* @return 結果
*/
@HystrixCommand(fallbackMethod = "sayHello1Fallback",commandProperties = {
@HystrixProperty(name="circuitBreaker.requestVolumeThreshold",value = "10"),
@HystrixProperty(name= HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS,value = "20000"),
@HystrixProperty(name= HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE,value = "30"),
})
@Override
public String invokeSayHello1(String name) {
long startTime = System.currentTimeMillis();
String result = feign1.sayHello1(name);
logger.error("您訪問了CustomerServiceImpl#sayHello1(),執行時間為:{} 毫秒",System.currentTimeMillis() - startTime );
return result;
}
3.請求快取
請求快取是保證在一次請求中多次呼叫同一個服務提供者介面,在cacheKey不變的情況下,後續呼叫結果都是第一次的快取結果,而不是多次請求服務提供者,從而降低服務提供者處理重複請求的壓力。
設定請求快取:
@Override
@CacheResult//用來標記請求命令返回的結果應該被快取
@HystrixCommand
public User getUserById( String id) {
return feign1.getUserById(id);
}
定義快取key:
當使用註解來定義請求快取時,若要為請求命令指定具體的快取key的生成規則,可以使用@CacheResult和@CacheRemove註解的cacheKeyMethod方法指定具體的生成函式,也可以通過@CacheKey註解在方法引數中指定用於組裝快取key的元素。
@Override
@CacheResult(cacheKeyMethod = "getUserByIdCacheKey")//用來標記請求命令返回的結果應該被快取
@HystrixCommand
public User getUserById( String id) {
return feign1.getUserById(id);
}
public String getUserByIdCacheKey(String id){
return id;
}
/**
* 第二種使用@CacheKey的方式,@CacheKey用來在請求命令的引數上標記,使其作為快取的key值,如果沒有標記則會使用所有引數,如果
* 同事還使用了@CacheResult和@CacheRemove註解的cacheKeyMethod方法指定快取Key的生成,那麼該註解將不會起作用
* 有些教程中說使用這個可以指定引數,比如:@CacheKey("id"),在我這邊會報錯:
* java.beans.IntrospectionException: Method not found: isId
*/
@Override
@CacheResult
@HystrixCommand
public User getUserById(@CacheKey String id) {
return feign1.getUserById(id);
}
清理快取:
@CacheRemove註解的commandKey屬性是必須要指定的,它用來指明需要使用請求快取的請求命令,因為只有通過該屬性的配置,Hystrix才能找到正確的請求命令快取位置。
@Override
@CacheRemove(commandKey = "getUserByIdCacheKey")//用來讓請求命令的快取失效,失效的快取根據定義的key決定
@HystrixCommand
public User removeUserById( String id) {
return feign1.getUserById(id);
}
controller呼叫,注意定義是要在同一個請求中,如果是不同的請求,則沒有效果。
@RequestMapping("/getUserById")
public List<User> getUserById(String id){
User user1 = serivce.getUserById(id);
User user2 = serivce.getUserById(id);
List<User> lsrUser = new ArrayList<>(2);
lsrUser.add(user1);
lsrUser.add(user2);
return lsrUser;
}
4.請求合併
請求合併是指在一段時間內將所有請求合併為一個請求,以減少通訊的消耗和執行緒數的佔用,從而大大降低服務端的負載。
請求合併的缺點:
在設定請求合併之後,本來一個請求可能5ms就搞定了,但是現在必須再等10ms等待其他的請求一起,這樣一個請求的耗時就從5ms增加到了15ms了。不過如果我們要發起的命令本身就是一個高延遲的命令,那麼這個時候就可以使用請求合併了,因為這個時候,等待的時間消耗就顯得微不足道了。所以如果需要設定請求合併,千萬不能將等待時間設定的過大。
服務提供者的控制類:
@RestController
public class UserBatchController {
/**
* 請求合併的方法
* @param ids
* @return
*/
@RequestMapping(value = "/getUserList", method = RequestMethod.GET)
public List<User> getUserList(String ids) {
System.out.println("ids===:" + ids);
String[] split = ids.split(",");
return Arrays.asList(split)
.stream()
.map(id -> new User(Integer.valueOf(id),"charon"+id,Integer.valueOf(id)*5))
.collect(Collectors.toList());
}
/**
* 請求單個user的方法
* @param id
* @return
*/
@RequestMapping(value = "/getUser/{id}", method = RequestMethod.GET)
public User getUser(@PathVariable("id") String id) {
User user = new User(1, "Charon",15);
return user;
}
}
消費者feign的呼叫介面:
@RequestMapping(value = "/getUser",method = RequestMethod.GET)
Future<User> getUser(@RequestParam("id")Integer id);
@RequestMapping(value = "/getUserList",method = RequestMethod.GET)
List<User> getUserList (@RequestParam("ids") String ids);
消費者的service及實現類:
Future<User> getUser(Integer i);
/**
* 表示在10s內的getUser請求將會合併到getUserList請求上,合併發出,最大的合併請求數為200
* @param userId 使用者id
* @return
*/
@HystrixCollapser(batchMethod = "getUserList",scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name="timerDelayInMilliseconds",value="10"),
@HystrixProperty(name="maxRequestsInBatch",value="200")
}
)
@Override
public Future<User> getUser(Integer userId){
Future<User> user = feign1.getUser(userId);
return user;
}
@HystrixCommand
public List<User> getUserList(List<Integer> userIdList) {
List<User> lstUser = feign1.getUserList(StringUtils.join(userIdList,","));
return lstUser;
}
消費者控制類:
/**
* 獲取單個使用者
* @return User
*/
@RequestMapping("/getUser")
public User getUser() throws ExecutionException, InterruptedException {
Future<User> user = serivce.getUser(1);
System.out.println("返回的結果:"+user);
return user.get();
}
/**
* 獲取使用者list
* @return list
*/
@RequestMapping("/getUserList")
public List<User> getUserList() throws ExecutionException, InterruptedException {
Future<User> user1 = serivce.getUser(1);
Future<User> user2= serivce.getUser(2);
Future<User> user3= serivce.getUser(3);
List<User> users = new ArrayList<>();
users.add(user1.get());
users.add(user2.get());
users.add(user3.get());
System.out.println("返回的結果:" + users);
return users;
}
標註了HystrixCollapser這個註解的,這個方法永遠不會執行,當有請求來的時候,直接請求batchMethod所指定的方法。batchMethod的方法在指定延遲時間內會將所有的請求合併一起執行
5.執行緒池隔離
Hystrix使用艙壁模式實現執行緒池的隔離,它會為每一個依賴服務建立一個獨立的執行緒池,這樣就算某個依賴服務出現延遲過高的情況,也只是對該依賴服務的呼叫產生影響,而不會拖慢其他的依賴服務。
使用執行緒池隔離的優點:
- 應用自身得到完全保護,不會受不可控的依賴服務影響,即便給依賴服務分配的執行緒池被填滿,也不會影響到其他的服務
- 可以有效降低接入新服務的風險,如果新服務接入後執行不穩定或存在問題,完全不會影響原來的請求
- 每個服務都是獨立的執行緒池,在一定程度上解決了高併發的問題
- 由於執行緒池有個數限制,所以也解決了限流的問題
使用執行緒池隔離的缺點:
- 增加了CPU的開銷,因為不僅有tomcat的執行緒池,還需要有Hystrix的執行緒池
- 每個操作都是獨立的執行緒,就有排隊、排程和上下文切換等問題
不配置執行緒隔離:
@RequestMapping("/useThread")
public String useThread(){
return serivce.useThread1() + " " + serivce.useThread2();
}
@Override
public String useThread1() {
String threadName = Thread.currentThread().getName();
logger.error("使用的執行緒名稱為:{}",threadName);
return "使用的執行緒名稱為:" + threadName;
}
@Override
public String useThread2() {
String threadName = Thread.currentThread().getName();
logger.error("使用的執行緒名稱為:{}",threadName);
return "使用的執行緒名稱為:" + threadName;
}
如果不配置執行緒隔離,則使用的是同一個執行緒
如果我們給useThread1方法設定執行緒隔離:
@HystrixCommand(groupKey = "useThread1",//分組,設定服務名,一個group使用一個執行緒
commandKey = "useThread1",//命令名稱,預設值為當前執行的方法名稱
threadPoolKey = "useThread1",//是配置執行緒池名稱,配置全域性唯一標識介面執行緒池的名稱,相同名稱的執行緒池是同一個。預設值是分組名groupKey
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),//執行緒池大小
@HystrixProperty(name = "maxQueueSize", value = "100"),//最大佇列長度
@HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),//執行緒存活時間
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "15")//拒絕請求
})
使用了執行緒池隔離之後,可以看到兩個請求使用的不通的執行緒池。
6.訊號量隔離
訊號量隔離是指在規定時間內只允許指定數量的訊號量進行服務訪問,其他得不到訊號量的執行緒進入fallback,訪問結束後,歸還訊號量。說白了就是做了一個限流。
@RequestMapping("/semaphore")
public String semaphore(){
for (int i = 0; i < 15; i++) {
new Thread(new Runnable() {
@Override
public void run() {
serivce.semaphore();
}
}).start();
}
return "OK";
}
@HystrixCommand(fallbackMethod = "semaphoreFallback",commandProperties = {
@HystrixProperty(name="execution.isolation.strategy",value="SEMAPHORE"), //使用訊號量隔離,預設為THREAD
@HystrixProperty(name="execution.isolation.semaphore.maxConcurrentRequests",value="10"), // 訊號量最大併發度
})
@Override
public void semaphore() {
try {
Thread.sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.error("正常執行方法");
}
public void semaphoreFallback(){
logger.error("執行了fallback方法");
}
如下圖所示,有10個執行緒拿到了訊號量,執行了正常的方法,有5個執行緒沒有拿到訊號量,直接呼叫fallback方法。
原理分析
上一篇文章說過openFeign主要是通過jdk的動態代理構建物件,所以Hystrix整合到feign當中也是使用的jdk動態代理的invocationHandler上,那麼來看看Hystrix實現的jdk的動態代理類--HystrixInvocationHandler吧!
invoke方法:
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// 如果呼叫的方法來自 java.lang.Object 則提前退出程式碼與 ReflectiveFeign.FeignInvocationHandler 相同
// ...
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
// 獲取並呼叫MethodHandler,MethodHandler封裝了Http請求,ribbon也在這裡被整合
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
// fallback的降級方法
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException | ExecutionException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
} catch (InterruptedException e) {
// Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
throw new AssertionError(e.getCause());
}
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
首先建立一個HystrixCommand,用來表示對依賴服務的操作請求,同時傳遞所有需要的引數,從命名中可以知道才用了“命令模式”來實現對服務呼叫操作的封裝。
命令模式:是指將來自客戶端的請求封裝成一個物件,從而讓呼叫者使用不同的請求對服務提供者進行引數化。
上面的兩種命令模式一共有4種命令的執行方式,Hystrix在執行的時候會根據建立的Command物件以及具體的情況來選擇一個執行。
- execute() 方法 :同步執行,從依賴的服務返回一個單一的結果物件,或是在發生錯誤時丟擲異常
- queue() 方法 :非同步執行,直接返回一個Future物件,其中包含了服務執行結束時要返回的單一結果物件
- observe()方法:返回Observable物件,它代表了操作的多個結果,是一個Hot observable
- toObservable()方法:同樣返回一個Observable物件,也表示了操作的多個結果,但它返回的是一個Cold Observable
接下來首先來看看HystrixCommand#execute()方法:
public R execute() {
try {
// queue()返回一個Future,get()同步等待執行結束,然後獲取非同步的結果。
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
跟進queue()方法:
public Future<R> queue() {
// 通過toObservable()獲得一個Cold Observable,
// 並且通過toBlocking()將該Observable轉換成BlockingObservable,可以把資料以阻塞的方式發射出來
// toFuture()則是把BlockingObservable轉換成一個Future
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// future實現,呼叫delegate的對應實現
}
return f;
}
在queue()中呼叫了核心方法--toObservable()方法,
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// ...
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
R afterFirstApplication = r;
try {
afterFirstApplication = executionHook.onComplete(_cmd, r);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
}
try {
return executionHook.onEmit(_cmd, afterFirstApplication);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return afterFirstApplication;
}
}
};
final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
try {
executionHook.onSuccess(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
}
};
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
// 先從快取中獲取如果有的話直接返回
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// 裡面訂閱了,所以開始執行hystrixObservable
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
這個方法非常長,首先看看applyHystrixSemantics()方法:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
// 判斷是否開啟斷路器
if (circuitBreaker.allowRequest()) {
// 斷路器是關閉的,則檢查識都有可用的資源來執行命令
// 獲取訊號量例項
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
// 釋放訊號量
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 嘗試獲取訊號量
if (executionSemaphore.tryAcquire()) {
try {
// 執行業務
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 訊號量獲取失敗,走fallback
return handleSemaphoreRejectionViaFallback();
}
} else {
// 斷路器是開啟的,快速熔斷,走fallback
return handleShortCircuitViaFallback();
}
}
applyHystrixSemantics()通過熔斷器的allowRequest()方法判斷是否需要快速失敗走fallback,如果允許執行那麼又會經過一層訊號量的控制,都通過才會走execute。
所以,核心邏輯就落到了HystrixCircuitBreaker#allowRequest()方法上:
public boolean allowRequest() {
// 強制開啟熔斷
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 強制關閉熔斷
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
// 判斷和計算當前斷路器是否開啟 或者 允許單個測試 ,通過這兩個方法的配合,實現了斷路器的開啟和關閉狀態的切換
return !isOpen() || allowSingleTest();
}
Hystrix允許強制開啟或者關閉熔斷,如果不想有請求執行就開啟,如果覺得可以忽略所有錯誤就關閉。在沒有強制開關的情況下,主要就是判斷當前熔斷是否開啟。另外,在熔斷器開啟的情況下,會在一定時間後允許發出一個測試的請求,來判斷是否開啟熔斷器。
首先來看看isOpen()方法:
public boolean isOpen() {
if (circuitOpen.get()) {
// 開關是開啟的,直接返回
return true;
}
// 開關未開啟,獲取健康統計
HealthCounts health = metrics.getHealthCounts();
// 總請求數太小的情況,不開啟熔斷
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// 總請求數夠了,失敗率比較小的情況,不開啟熔斷
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 總請求數和失敗率都比較大的時候,設定開關為開啟,進行熔斷
if (circuitOpen.compareAndSet(false, true)) {
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
總體邏輯就是判斷一個失敗次數是否達到開啟熔斷的條件,如果達到那麼設定開啟的開關。在熔斷一直開啟的情況下,偶爾會放過一個測試請求來判斷是否關閉。
下面看看allowSingleTest()方法:
public boolean allowSingleTest() {
// 獲取熔斷開啟時間,或者上一次的測試時間
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 如果熔斷處於開啟狀態,且當前時間距離熔斷開啟時間或者上一次執行測試請求時間已經到了
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// 使用cas機制控制熔斷的開啟
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
回到applyHystrixSemantics()這個方法中,獲取到訊號量之後,執行業務的方法,在executeCommandAndObserve()中進行了一些超時及失敗的邏輯處理之後,進入HystrixCommand#executeCommandWithSpecifiedIsolation()中:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// ...
Observable<R> execution;
// 判斷是否開啟超時設定
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
在executeCommandWithSpecifiedIsolation(),先判斷是否進行執行緒隔離,及一些狀態變化之後,進入getUserExecutionObservable():
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 執行緒隔離
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
// 狀態校驗
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// 同級標記命令
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// 該命令在包裝執行緒中超時,立即返回
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
在getUserExecutionObservable()和getExecutionObservable()中,主要是封裝使用者定義的run方法:
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
// 獲取使用者定義邏輯的Observable
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
HystrixCommand#getExecutionObservable():
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 包裝定義的run方法
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
到這裡,hystris分析就結束了。hystrix其實就是在feign的呼叫過程插了一腳,通過對請求的成功失敗的統計資料來開關是否進行熔斷。又在每個時間視窗內傳送一個測試請求出去,來判斷是否關閉熔斷。總得來說還是很清晰實用的。
參考文章:
翟永超老師的《Spring Cloud微服務實戰》
https://zhuanlan.zhihu.com/p/114942145