轉:Hystrix原理與實戰
背景
分散式系統環境下,服務間類似依賴非常常見,一個業務呼叫通常依賴多個基礎服務。如下圖,對於同步呼叫,當庫存服務不可用時,商品服務請求執行緒被阻塞,當有大批量請求呼叫庫存服務時,最終可能導致整個商品服務資源耗盡,無法繼續對外提供服務。並且這種不可用可能沿請求呼叫鏈向上傳遞,這種現象被稱為雪崩效應。
雪崩效應常見場景
- 硬體故障:如伺服器宕機,機房斷電,光纖被挖斷等。
- 流量激增:如異常流量,重試加大流量等。
- 快取穿透:一般發生在應用重啟,所有快取失效時,以及短時間內大量快取失效時。大量的快取不命中,使請求直擊後端服務,造成服務提供者超負荷執行,引起服務不可用。
- 程式BUG:如程式邏輯導致記憶體洩漏,JVM長時間FullGC等。
- 同步等待:服務間採用同步呼叫模式,同步等待造成的資源耗盡。
雪崩效應應對策略
針對造成雪崩效應的不同場景,可以使用不同的應對策略,沒有一種通用所有場景的策略,參考如下:
- 硬體故障:多機房容災、異地多活等。
- 流量激增:服務自動擴容、流量控制(限流、關閉重試)等。
- 快取穿透:快取預載入、快取非同步載入等。
- 程式BUG:修改程式bug、及時釋放資源等。
- 同步等待:資源隔離、MQ解耦、不可用服務呼叫快速失敗等。資源隔離通常指不同服務呼叫採用不同的執行緒池;不可用服務呼叫快速失敗一般通過熔斷器模式結合超時機制實現。
綜上所述,如果一個應用不能對來自依賴的故障進行隔離,那該應用本身就處在被拖垮的風險中。 因此,為了構建穩定、可靠的分散式系統,我們的服務應當具有自我保護能力,當依賴服務不可用時,當前服務啟動自我保護功能,從而避免發生雪崩效應。本文將重點介紹使用Hystrix解決同步等待的雪崩問題。
初探Hystrix
Hystrix [hɪst'rɪks],中文含義是豪豬,因其背上長滿棘刺,從而擁有了自我保護的能力。本文所說的Hystrix是Netflix開源的一款容錯框架,同樣具有自我保護能力。為了實現容錯和自我保護,下面我們看看Hystrix如何設計和實現的。
Hystrix設計目標:
- 對來自依賴的延遲和故障進行防護和控制——這些依賴通常都是通過網路訪問的
- 阻止故障的連鎖反應
- 快速失敗並迅速恢復
- 回退並優雅降級
- 提供近實時的監控與告警
Hystrix遵循的設計原則:
- 防止任何單獨的依賴耗盡資源(執行緒)
- 過載立即切斷並快速失敗,防止排隊
- 儘可能提供回退以保護使用者免受故障
- 使用隔離技術(例如隔板,泳道和斷路器模式)來限制任何一個依賴的影響
- 通過近實時的指標,監控和告警,確保故障被及時發現
- 通過動態修改配置屬性,確保故障及時恢復
- 防止整個依賴客戶端執行失敗,而不僅僅是網路通訊
Hystrix如何實現這些設計目標?
- 使用命令模式將所有對外部服務(或依賴關係)的呼叫包裝在HystrixCommand或HystrixObservableCommand物件中,並將該物件放在單獨的執行緒中執行;
- 每個依賴都維護著一個執行緒池(或訊號量),執行緒池被耗盡則拒絕請求(而不是讓請求排隊)。
- 記錄請求成功,失敗,超時和執行緒拒絕。
- 服務錯誤百分比超過了閾值,熔斷器開關自動開啟,一段時間內停止對該服務的所有請求。
- 請求失敗,被拒絕,超時或熔斷時執行降級邏輯。
- 近實時地監控指標和配置的修改。
Hystrix入門
Hystrix簡單示例
開始深入Hystrix原理之前,我們先簡單看一個示例。
第一步,繼承HystrixCommand實現自己的command,在command的構造方法中需要配置請求被執行需要的引數,並組合實際傳送請求的物件,程式碼如下:
public class QueryOrderIdCommand extends HystrixCommand<Integer> {
private final static Logger logger = LoggerFactory.getLogger(QueryOrderIdCommand.class);
private OrderServiceProvider orderServiceProvider;
public QueryOrderIdCommand(OrderServiceProvider orderServiceProvider) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("orderService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("queryByOrderId"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(10)//至少有10個請求,熔斷器才進行錯誤率的計算
.withCircuitBreakerSleepWindowInMilliseconds(5000)//熔斷器中斷請求5秒後會進入半開啟狀態,放部分流量過去重試
.withCircuitBreakerErrorThresholdPercentage(50)//錯誤率達到50開啟熔斷保護
.withExecutionTimeoutEnabled(true))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties
.Setter().withCoreSize(10)));
this.orderServiceProvider = orderServiceProvider;
}
@Override
protected Integer run() {
return orderServiceProvider.queryByOrderId();
}
@Override
protected Integer getFallback() {
return -1;
}
}
第二步,呼叫HystrixCommand的執行方法發起實際請求。
@Test
public void testQueryByOrderIdCommand() {
Integer r = new QueryOrderIdCommand(orderServiceProvider).execute();
logger.info("result:{}", r);
}
Hystrix處理流程
Hystrix流程圖如下:
Hystrix整個工作流如下:
- 構造一個 HystrixCommand或HystrixObservableCommand物件,用於封裝請求,並在構造方法配置請求被執行需要的引數;
- 執行命令,Hystrix提供了4種執行命令的方法,後面詳述;
- 判斷是否使用快取響應請求,若啟用了快取,且快取可用,直接使用快取響應請求。Hystrix支援請求快取,但需要使用者自定義啟動;
- 判斷熔斷器是否開啟,如果開啟,跳到第8步;
- 判斷執行緒池/佇列/訊號量是否已滿,已滿則跳到第8步;
- 執行HystrixObservableCommand.construct()或HystrixCommand.run(),如果執行失敗或者超時,跳到第8步;否則,跳到第9步;
- 統計熔斷器監控指標;
- 走Fallback備用邏輯
- 返回請求響應
從流程圖上可知道,第5步執行緒池/佇列/訊號量已滿時,還會執行第7步邏輯,更新熔斷器統計資訊,而第6步無論成功與否,都會更新熔斷器統計資訊。
執行命令的幾種方法
Hystrix提供了4種執行命令的方法,execute()和queue() 適用於HystrixCommand物件,而observe()和toObservable()適用於HystrixObservableCommand物件。
execute()
以同步堵塞方式執行run(),只支援接收一個值物件。hystrix會從執行緒池中取一個執行緒來執行run(),並等待返回值。
queue()
以非同步非阻塞方式執行run(),只支援接收一個值物件。呼叫queue()就直接返回一個Future物件。可通過 Future.get()拿到run()的返回結果,但Future.get()是阻塞執行的。若執行成功,Future.get()返回單個返回值。當執行失敗時,如果沒有重寫fallback,Future.get()丟擲異常。
observe()
事件註冊前執行run()/construct(),支援接收多個值物件,取決於發射源。呼叫observe()會返回一個hot Observable,也就是說,呼叫observe()自動觸發執行run()/construct(),無論是否存在訂閱者。
如果繼承的是HystrixCommand,hystrix會從執行緒池中取一個執行緒以非阻塞方式執行run();如果繼承的是HystrixObservableCommand,將以呼叫執行緒阻塞執行construct()。
observe()使用方法:
- 呼叫observe()會返回一個Observable物件
- 呼叫這個Observable物件的subscribe()方法完成事件註冊,從而獲取結果
toObservable()
事件註冊後執行run()/construct(),支援接收多個值物件,取決於發射源。呼叫toObservable()會返回一個cold Observable,也就是說,呼叫toObservable()不會立即觸發執行run()/construct(),必須有訂閱者訂閱Observable時才會執行。
如果繼承的是HystrixCommand,hystrix會從執行緒池中取一個執行緒以非阻塞方式執行run(),呼叫執行緒不必等待run();如果繼承的是HystrixObservableCommand,將以呼叫執行緒堵塞執行construct(),呼叫執行緒需等待construct()執行完才能繼續往下走。
toObservable()使用方法:
- 呼叫observe()會返回一個Observable物件
- 呼叫這個Observable物件的subscribe()方法完成事件註冊,從而獲取結果
需注意的是,HystrixCommand也支援toObservable()和observe(),但是即使將HystrixCommand轉換成Observable,它也只能發射一個值物件。只有HystrixObservableCommand才支援發射多個值物件。
幾種方法的關係
- execute()實際是呼叫了queue().get()
- queue()實際呼叫了toObservable().toBlocking().toFuture()
- observe()實際呼叫toObservable()獲得一個cold Observable,再建立一個ReplaySubject物件訂閱Observable,將源Observable轉化為hot Observable。因此呼叫observe()會自動觸發執行run()/construct()。
Hystrix總是以Observable的形式作為響應返回,不同執行命令的方法只是進行了相應的轉換。
Hystrix容錯
Hystrix的容錯主要是通過新增容許延遲和容錯方法,幫助控制這些分散式服務之間的互動。 還通過隔離服務之間的訪問點,阻止它們之間的級聯故障以及提供回退選項來實現這一點,從而提高系統的整體彈性。Hystrix主要提供了以下幾種容錯方法:
- 資源隔離
- 熔斷
- 降級
下面我們詳細談談這幾種容錯機制。
資源隔離
資源隔離主要指對執行緒的隔離。Hystrix提供了兩種執行緒隔離方式:執行緒池和訊號量。
執行緒隔離-執行緒池
Hystrix通過命令模式對傳送請求的物件和執行請求的物件進行解耦,將不同型別的業務請求封裝為對應的命令請求。如訂單服務查詢商品,查詢商品請求->商品Command;商品服務查詢庫存,查詢庫存請求->庫存Command。並且為每個型別的Command配置一個執行緒池,當第一次建立Command時,根據配置建立一個執行緒池,並放入ConcurrentHashMap,如商品Command:
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
...
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
後續查詢商品的請求建立Command時,將會重用已建立的執行緒池。執行緒池隔離之後的服務依賴關係:
通過將傳送請求執行緒與執行請求的執行緒分離,可有效防止發生級聯故障。當執行緒池或請求佇列飽和時,Hystrix將拒絕服務,使得請求執行緒可以快速失敗,從而避免依賴問題擴散。
執行緒池隔離優缺點
優點:
- 保護應用程式以免受來自依賴故障的影響,指定依賴執行緒池飽和不會影響應用程式的其餘部分。
- 當引入新客戶端lib時,即使發生問題,也是在本lib中,並不會影響到其他內容。
- 當依賴從故障恢復正常時,應用程式會立即恢復正常的效能。
- 當應用程式一些配置引數錯誤時,執行緒池的執行狀況會很快檢測到這一點(通過增加錯誤,延遲,超時,拒絕等),同時可以通過動態屬性進行實時糾正錯誤的引數配置。
- 如果服務的效能有變化,需要實時調整,比如增加或者減少超時時間,更改重試次數,可以通過執行緒池指標動態屬性修改,而且不會影響到其他呼叫請求。
- 除了隔離優勢外,hystrix擁有專門的執行緒池可提供內建的併發功能,使得可以在同步呼叫之上構建非同步門面(外觀模式),為非同步程式設計提供了支援(Hystrix引入了Rxjava非同步框架)。
注意:儘管執行緒池提供了執行緒隔離,我們的客戶端底層程式碼也必須要有超時設定或響應執行緒中斷,不能無限制的阻塞以致執行緒池一直飽和。
缺點:
執行緒池的主要缺點是增加了計算開銷。每個命令的執行都在單獨的執行緒完成,增加了排隊、排程和上下文切換的開銷。因此,要使用Hystrix,就必須接受它帶來的開銷,以換取它所提供的好處。
通常情況下,執行緒池引入的開銷足夠小,不會有重大的成本或效能影響。但對於一些訪問延遲極低的服務,如只依賴記憶體快取,執行緒池引入的開銷就比較明顯了,這時候使用執行緒池隔離技術就不適合了,我們需要考慮更輕量級的方式,如訊號量隔離。
執行緒隔離-訊號量
上面提到了執行緒池隔離的缺點,當依賴延遲極低的服務時,執行緒池隔離技術引入的開銷超過了它所帶來的好處。這時候可以使用訊號量隔離技術來代替,通過設定訊號量來限制對任何給定依賴的併發呼叫量。下圖說明了執行緒池隔離和訊號量隔離的主要區別:
使用執行緒池時,傳送請求的執行緒和執行依賴服務的執行緒不是同一個,而使用訊號量時,傳送請求的執行緒和執行依賴服務的執行緒是同一個,都是發起請求的執行緒。先看一個使用訊號量隔離執行緒的示例:
public class QueryByOrderIdCommandSemaphore extends HystrixCommand<Integer> {
private final static Logger logger = LoggerFactory.getLogger(QueryByOrderIdCommandSemaphore.class);
private OrderServiceProvider orderServiceProvider;
public QueryByOrderIdCommandSemaphore(OrderServiceProvider orderServiceProvider) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("orderService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("queryByOrderId"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(10)////至少有10個請求,熔斷器才進行錯誤率的計算
.withCircuitBreakerSleepWindowInMilliseconds(5000)//熔斷器中斷請求5秒後會進入半開啟狀態,放部分流量過去重試
.withCircuitBreakerErrorThresholdPercentage(50)//錯誤率達到50開啟熔斷保護
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
.withExecutionIsolationSemaphoreMaxConcurrentRequests(10)));//最大併發請求量
this.orderServiceProvider = orderServiceProvider;
}
@Override
protected Integer run() {
return orderServiceProvider.queryByOrderId();
}
@Override
protected Integer getFallback() {
return -1;
}
}
由於Hystrix預設使用執行緒池做執行緒隔離,使用訊號量隔離需要顯示地將屬性execution.isolation.strategy設定為ExecutionIsolationStrategy.SEMAPHORE,同時配置訊號量個數,預設為10。客戶端需向依賴服務發起請求時,首先要獲取一個訊號量才能真正發起呼叫,由於訊號量的數量有限,當併發請求量超過訊號量個數時,後續的請求都會直接拒絕,進入fallback流程。
訊號量隔離主要是通過控制併發請求量,防止請求執行緒大面積阻塞,從而達到限流和防止雪崩的目的。
執行緒隔離總結
執行緒池和訊號量都可以做執行緒隔離,但各有各的優缺點和支援的場景,對比如下:
執行緒切換 | 支援非同步 | 支援超時 | 支援熔斷 | 限流 | 開銷 | |
訊號量 | 否 | 否 | 否 | 是 | 是 | 小 |
執行緒池 | 是 | 是 | 是 | 是 | 是 | 大 |
執行緒池和訊號量都支援熔斷和限流。相比執行緒池,訊號量不需要執行緒切換,因此避免了不必要的開銷。但是訊號量不支援非同步,也不支援超時,也就是說當所請求的服務不可用時,訊號量會控制超過限制的請求立即返回,但是已經持有訊號量的執行緒只能等待服務響應或從超時中返回,即可能出現長時間等待。執行緒池模式下,當超過指定時間未響應的服務,Hystrix會通過響應中斷的方式通知執行緒立即結束並返回。
熔斷
熔斷器簡介
現實生活中,可能大家都有注意到家庭電路中通常會安裝一個保險盒,當負載過載時,保險盒中的保險絲會自動熔斷,以保護電路及家裡的各種電器,這就是熔斷器的一個常見例子。Hystrix中的熔斷器(Circuit Breaker)也是起類似作用,Hystrix在執行過程中會向每個commandKey對應的熔斷器報告成功、失敗、超時和拒絕的狀態,熔斷器維護並統計這些資料,並根據這些統計資訊來決策熔斷開關是否開啟。如果開啟,熔斷後續請求,快速返回。隔一段時間(預設是5s)之後熔斷器嘗試半開,放入一部分流量請求進來,相當於對依賴服務進行一次健康檢查,如果請求成功,熔斷器關閉。
熔斷器配置
Circuit Breaker主要包括如下6個引數:
1、circuitBreaker.enabled
是否啟用熔斷器,預設是TRUE。 2 、circuitBreaker.forceOpen
熔斷器強制開啟,始終保持開啟狀態,不關注熔斷開關的實際狀態。預設值FLASE。 3、circuitBreaker.forceClosed 熔斷器強制關閉,始終保持關閉狀態,不關注熔斷開關的實際狀態。預設值FLASE。
4、circuitBreaker.errorThresholdPercentage 錯誤率,預設值50%,例如一段時間(10s)內有100個請求,其中有54個超時或者異常,那麼這段時間內的錯誤率是54%,大於了預設值50%,這種情況下會觸發熔斷器開啟。
5、circuitBreaker.requestVolumeThreshold
預設值20。含義是一段時間內至少有20個請求才進行errorThresholdPercentage計算。比如一段時間了有19個請求,且這些請求全部失敗了,錯誤率是100%,但熔斷器不會開啟,總請求數不滿足20。
6、circuitBreaker.sleepWindowInMilliseconds
半開狀態試探睡眠時間,預設值5000ms。如:當熔斷器開啟5000ms之後,會嘗試放過去一部分流量進行試探,確定依賴服務是否恢復。
熔斷器工作原理
下圖展示了HystrixCircuitBreaker的工作原理:
熔斷器工作的詳細過程如下:
第一步,呼叫allowRequest()判斷是否允許將請求提交到執行緒池
- 如果熔斷器強制開啟,circuitBreaker.forceOpen為true,不允許放行,返回。
- 如果熔斷器強制關閉,circuitBreaker.forceClosed為true,允許放行。此外不必關注熔斷器實際狀態,也就是說熔斷器仍然會維護統計資料和開關狀態,只是不生效而已。
第二步,呼叫isOpen()判斷熔斷器開關是否開啟
- 如果熔斷器開關開啟,進入第三步,否則繼續;
- 如果一個週期內總的請求數小於circuitBreaker.requestVolumeThreshold的值,允許請求放行,否則繼續;
- 如果一個週期內錯誤率小於circuitBreaker.errorThresholdPercentage的值,允許請求放行。否則,開啟熔斷器開關,進入第三步。
第三步,呼叫allowSingleTest()判斷是否允許單個請求通行,檢查依賴服務是否恢復
- 如果熔斷器開啟,且距離熔斷器開啟的時間或上一次試探請求放行的時間超過circuitBreaker.sleepWindowInMilliseconds的值時,熔斷器器進入半開狀態,允許放行一個試探請求;否則,不允許放行。
此外,為了提供決策依據,每個熔斷器預設維護了10個bucket,每秒一個bucket,當新的bucket被建立時,最舊的bucket會被拋棄。其中每個blucket維護了請求成功、失敗、超時、拒絕的計數器,Hystrix負責收集並統計這些計數器。
熔斷器測試
1、以QueryOrderIdCommand為測試command
2、配置orderServiceProvider不重試且500ms超時
<dubbo:reference id="orderServiceProvider" interface="com.huang.provider.OrderServiceProvider"
timeout="500" retries="0"/>
3、OrderServiceProviderImpl實現很簡單,前10個請求,服務端休眠600ms,使得客戶端呼叫超時。
@Service
public class OrderServiceProviderImpl implements OrderServiceProvider {
private final static Logger logger = LoggerFactory.getLogger(OrderServiceProviderImpl.class);
private AtomicInteger OrderIdCounter = new AtomicInteger(0);
@Override
public Integer queryByOrderId() {
int c = OrderIdCounter.getAndIncrement();
if (logger.isDebugEnabled()) {
logger.debug("OrderIdCounter:{}", c);
}
if (c < 10) {
try {
Thread.sleep(600);
} catch (InterruptedException e) {
}
}
return c;
}
@Override
public void reset() {
OrderIdCounter.getAndSet(0);
}
}
4、單測程式碼
@Test
public void testExecuteCommand() throws InterruptedException {
orderServiceProvider.reset();
int i = 1;
for (; i < 15; i++) {
HystrixCommand<Integer> command = new QueryByOrderIdCommand(orderServiceProvider);
Integer r = command.execute();
String method = r == -1 ? "fallback" : "run";
logger.info("call {} times,result:{},method:{},isCircuitBreakerOpen:{}", i, r, method, command.isCircuitBreakerOpen());
}
//等待6s,使得熔斷器進入半開啟狀態
Thread.sleep(6000);
for (; i < 20; i++) {
HystrixCommand<Integer> command = new QueryByOrderIdCommand(orderServiceProvider);
Integer r = command.execute();
String method = r == -1 ? "fallback" : "run";
logger.info("call {} times,result:{},method:{},isCircuitBreakerOpen:{}", i, r, method, command.isCircuitBreakerOpen());
}
}
5、輸出結果
2018-02-07 11:38:36,056 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 1 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:36,564 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 2 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:37,074 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 3 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:37,580 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 4 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:38,089 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 5 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:38,599 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 6 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:39,109 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 7 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:39,622 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 8 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:40,138 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 9 times,result:-1,method:fallback,isCircuitBreakerOpen:false
2018-02-07 11:38:40,647 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 10 times,result:-1,method:fallback,isCircuitBreakerOpen:true
2018-02-07 11:38:40,651 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 11 times,result:-1,method:fallback,isCircuitBreakerOpen:true
2018-02-07 11:38:40,653 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 12 times,result:-1,method:fallback,isCircuitBreakerOpen:true
2018-02-07 11:38:40,656 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 13 times,result:-1,method:fallback,isCircuitBreakerOpen:true
2018-02-07 11:38:40,658 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:36 call 14 times,result:-1,method:fallback,isCircuitBreakerOpen:true
2018-02-07 11:38:46,671 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:44 call 15 times,result:10,method:run,isCircuitBreakerOpen:false
2018-02-07 11:38:46,675 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:44 call 16 times,result:11,method:run,isCircuitBreakerOpen:false
2018-02-07 11:38:46,680 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:44 call 17 times,result:12,method:run,isCircuitBreakerOpen:false
2018-02-07 11:38:46,685 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:44 call 18 times,result:13,method:run,isCircuitBreakerOpen:false
2018-02-07 11:38:46,691 INFO [main] com.huang.test.command.QueryByOrderIdCommandTest:testExecuteCommand:44 call 19 times,result:14,method:run,isCircuitBreakerOpen:false
前9個請求呼叫超時,走fallback邏輯;
10-14個請求,熔斷器開關開啟,直接快速失敗走fallback邏輯;
15-19個請求,熔斷器進入半開狀態,放行一個試探請求呼叫成功,熔斷器關閉,後續請求恢復。
回退降級
降級,通常指務高峰期,為了保證核心服務正常執行,需要停掉一些不太重要的業務,或者某些服務不可用時,執行備用邏輯從故障服務中快速失敗或快速返回,以保障主體業務不受影響。Hystrix提供的降級主要是為了容錯,保證當前服務不受依賴服務故障的影響,從而提高服務的健壯性。要支援回退或降級處理,可以重寫HystrixCommand的getFallBack方法或HystrixObservableCommand的resumeWithFallback方法。
Hystrix在以下幾種情況下會走降級邏輯:
- 執行construct()或run()丟擲異常
- 熔斷器開啟導致命令短路
- 命令的執行緒池和佇列或訊號量的容量超額,命令被拒絕
- 命令執行超時
降級回退方式
Fail Fast 快速失敗
快速失敗是最普通的命令執行方法,命令沒有重寫降級邏輯。 如果命令執行發生任何型別的故障,它將直接丟擲異常。
Fail Silent 無聲失敗
指在降級方法中通過返回null,空Map,空List或其他類似的響應來完成。
@Override
protected Integer getFallback() {
return null;
}
@Override
protected List<Integer> getFallback() {
return Collections.emptyList();
}
@Override
protected Observable<Integer> resumeWithFallback() {
return Observable.empty();
}
Fallback: Static
指在降級方法中返回靜態預設值。 這不會導致服務以“無聲失敗”的方式被刪除,而是導致預設行為發生。如:應用根據命令執行返回true / false執行相應邏輯,但命令執行失敗,則預設為true
@Override
protected Boolean getFallback() {
return true;
}
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.just( true );
}
Fallback: Stubbed
當命令返回一個包含多個欄位的複合物件時,適合以Stubbed 的方式回退。
@Override
protected MissionInfo getFallback() {
return new MissionInfo("missionName","error");
}
Fallback: Cache via Network
有時,如果呼叫依賴服務失敗,可以從快取服務(如redis)中查詢舊資料版本。由於又會發起遠端呼叫,所以建議重新封裝一個Command,使用不同的ThreadPoolKey,與主執行緒池進行隔離。
@Override
protected Integer getFallback() {
return new RedisServiceCommand(redisService).execute();
}
Primary + Secondary with Fallback
有時系統具有兩種行為- 主要和次要,或主要和故障轉移。主要和次要邏輯涉及到不同的網路呼叫和業務邏輯,所以需要將主次邏輯封裝在不同的Command中,使用執行緒池進行隔離。為了實現主從邏輯切換,可以將主次command封裝在外觀HystrixCommand的run方法中,並結合配置中心設定的開關切換主從邏輯。由於主次邏輯都是經過執行緒池隔離的HystrixCommand,因此外觀HystrixCommand可以使用訊號量隔離,而沒有必要使用執行緒池隔離引入不必要的開銷。原理圖如下:
主次模型的使用場景還是很多的。如當系統升級新功能時,如果新版本的功能出現問題,通過開關控制降級呼叫舊版本的功能。示例程式碼如下:
public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {
private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);
private final int id;
public CommandFacadeWithPrimarySecondary(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
.andCommandPropertiesDefaults(
// 由於主次command已經使用執行緒池隔離,Facade Command使用訊號量隔離即可
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
if (usePrimary.get()) {
return new PrimaryCommand(id).execute();
} else {
return new SecondaryCommand(id).execute();
}
}
@Override
protected String getFallback() {
return "static-fallback-" + id;
}
@Override
protected String getCacheKey() {
return String.valueOf(id);
}
private static class PrimaryCommand extends HystrixCommand<String> {
private final int id;
private PrimaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
.andCommandPropertiesDefaults( HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
this.id = id;
}
@Override
protected String run() {
return "responseFromPrimary-" + id;
}
}
private static class SecondaryCommand extends HystrixCommand<String> {
private final int id;
private SecondaryCommand(int id) {
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
.andCommandPropertiesDefaults( HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
this.id = id;
}
@Override
protected String run() {
return "responseFromSecondary-" + id;
}
}
public static class UnitTest {
@Test
public void testPrimary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
@Test
public void testSecondary() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
} finally {
context.shutdown();
ConfigurationManager.getConfigInstance().clear();
}
}
}
}
通常情況下,建議重寫getFallBack或resumeWithFallback提供自己的備用邏輯,但不建議在回退邏輯中執行任何可能失敗的操作。
總結
本文介紹了Hystrix及其工作原理,還介紹了Hystrix執行緒池隔離、訊號量隔離和熔斷器的工作原理,以及如何使用Hystrix的資源隔離,熔斷和降級等技術實現服務容錯,從而提高系統的整體健壯性。