Hystrix執行緒隔離技術解析-執行緒池
認識Hystrix
Hystrix是Netflix開源的一款容錯框架,包含常用的容錯方法:執行緒隔離、訊號量隔離、降級策略、熔斷技術。
在高併發訪問下,系統所依賴的服務的穩定性對系統的影響非常大,依賴有很多不可控的因素,比如網路連線變慢,資源突然繁忙,暫時不可用,服務離線等。我們要構建穩定、可靠的分散式系統,就必須要有這樣一套容錯方法。
本文主要討論執行緒隔離技術。
為什麼要做執行緒隔離
比如我們現在有3個業務呼叫分別是查詢訂單、查詢商品、查詢使用者,且這三個業務請求都是依賴第三方服務-訂單服務、商品服務、使用者服務。三個服務均是通過RPC呼叫。當查詢訂單服務,假如執行緒阻塞了,這個時候後續有大量的查詢訂單請求過來,那麼容器中的執行緒數量則會持續增加直致CPU資源耗盡到100%,整個服務對外不可用,叢集環境下就是雪崩。如下圖
訂單服務不可用.png:
整個tomcat容器不可用.png
Hystrix是如何通過執行緒池實現執行緒隔離的
Hystrix通過命令模式,將每個型別的業務請求封裝成對應的命令請求,比如查詢訂單->訂單Command,查詢商品->商品Command,查詢使用者->使用者Command。每個型別的Command對應一個執行緒池。建立好的執行緒池是被放入到ConcurrentHashMap中,比如查詢訂單:
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>(); threadPools.put(“hystrix-order”, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
當第二次查詢訂單請求過來的時候,則可以直接從Map中獲取該執行緒池。具體流程如下圖:
hystrix執行緒執行過程和非同步化.png
建立執行緒池中的執行緒的方法,檢視原始碼如下:
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { ThreadFactory threadFactory = null; if (!PlatformSpecific.isAppEngineStandardEnvironment()) { threadFactory = new ThreadFactory() { protected final AtomicInteger threadNumber = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { threadFactory = PlatformSpecific.getAppEngineThreadFactory(); } final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } }
執行Command的方式一共四種,直接看官方文件(https://github.com/Netflix/Hystrix/wiki/How-it-Works),具體區別如下:
-
execute():以同步堵塞方式執行run()。呼叫execute()後,hystrix先建立一個新執行緒執行run(),接著呼叫程式要在execute()呼叫處一直堵塞著,直到run()執行完成。
-
queue():以非同步非堵塞方式執行run()。呼叫queue()就直接返回一個Future物件,同時hystrix建立一個新執行緒執行run(),呼叫程式通過Future.get()拿到run()的返回結果,而Future.get()是堵塞執行的。
-
observe():事件註冊前執行run()/construct()。第一步是事件註冊前,先呼叫observe()自動觸發執行run()/construct()(如果繼承的是HystrixCommand,hystrix將建立新執行緒非堵塞執行run();如果繼承的是HystrixObservableCommand,將以呼叫程式執行緒堵塞執行construct()),第二步是從observe()返回後呼叫程式呼叫subscribe()完成事件註冊,如果run()/construct()執行成功則觸發onNext()和onCompleted(),如果執行異常則觸發onError()。
-
toObservable():事件註冊後執行run()/construct()。第一步是事件註冊前,呼叫toObservable()就直接返回一個Observable物件,第二步呼叫subscribe()完成事件註冊後自動觸發執行run()/construct()(如果繼承的是HystrixCommand,hystrix將建立新執行緒非堵塞執行run(),呼叫程式不必等待run();如果繼承的是HystrixObservableCommand,將以呼叫程式執行緒堵塞執行construct(),呼叫程式等待construct()執行完才能繼續往下走),如果run()/construct()執行成功則觸發onNext()和onCompleted(),如果執行異常則觸發onError()
-
注:
execute()和queue()是在HystrixCommand中,observe()和toObservable()是在HystrixObservableCommand 中。從底層實現來講,HystrixCommand其實也是利用Observable實現的(看Hystrix原始碼,可以發現裡面大量使用了RxJava),儘管它只返回單個結果。HystrixCommand的queue方法實際上是呼叫了toObservable().toBlocking().toFuture(),而execute方法實際上是呼叫了queue().get()。
如何應用到實際程式碼中
package myHystrix.threadpool;
import com.netflix.hystrix.*;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.Future;
/**
* Created by wangxindong on 2017/8/4.
*/
public class GetOrderCommand extends HystrixCommand<List> {
OrderService orderService;
public GetOrderCommand(String name){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withMaxQueueSize(10) //配置佇列大小
.withCoreSize(2) // 配置執行緒池裡的執行緒數
)
);
}
@Override
protected List run() throws Exception {
return orderService.getOrderList();
}
public static class UnitTest {
@Test
public void testGetOrder(){
// new GetOrderCommand("hystrix-order").execute();
Future<List> future =new GetOrderCommand("hystrix-order").queue();
}
}
}
總結
執行依賴程式碼的執行緒與請求執行緒(比如Tomcat執行緒)分離,請求執行緒可以自由控制離開的時間,這也是我們通常說的非同步程式設計,Hystrix是結合RxJava來實現的非同步程式設計。通過設定執行緒池大小來控制併發訪問量,當執行緒飽和的時候可以拒絕服務,防止依賴問題擴散。
執行緒隔離.png
執行緒隔離的優點:
- 1、:應用程式會被完全保護起來,即使依賴的一個服務的執行緒池滿了,也不會影響到應用程式的其他部分。
- 2、:我們給應用程式引入一個新的風險較低的客戶端lib的時候,如果發生問題,也是在本lib中,並不會影響到其他內容,因此我們可以大膽的引入新lib庫。
- 3、:當依賴的一個失敗的服務恢復正常時,應用程式會立即恢復正常的效能。
- 4、:如果我們的應用程式一些引數配置錯誤了,執行緒池的執行狀況將會很快顯示出來,比如延遲、超時、拒絕等。同時可以通過動態屬性實時執行來處理糾正錯誤的引數配置。
- 5、:如果服務的效能有變化,從而需要調整,比如增加或者減少超時時間,更改重試次數,就可以通過執行緒池指標動態屬性修改,而且不會影響到其他呼叫請求。
- 6、:除了隔離優勢外,hystrix擁有專門的執行緒池可提供內建的併發功能,使得可以在同步呼叫之上構建非同步的外觀模式,這樣就可以很方便的做非同步程式設計(Hystrix引入了Rxjava非同步框架)。
儘管執行緒池提供了執行緒隔離,我們的客戶端底層程式碼也必須要有超時設定,不能無限制的阻塞以致執行緒池一直飽和。
執行緒隔離的缺點:
-
1、執行緒池的主要缺點就是它增加了計算的開銷,每個業務請求(被包裝成命令)在執行的時候,會涉及到請求排隊,排程和上下文切換。不過Netflix公司內部認為執行緒隔離開銷足夠小,不會產生重大的成本或效能的影響。
-
2、Netflix API每天使用執行緒隔離處理10億次Hystrix Command執行。 每個API例項都有40多個執行緒池,每個執行緒池中有5-20個執行緒(大多數設定為10個)。
(官方原話:The Netflix API processes 10+ billion Hystrix Command executions per day using thread isolation. Each API instance has 40+ thread-pools with 5–20 threads in each (most are set to 10).) -
3、對於不依賴網路訪問的服務,比如只依賴記憶體快取這種情況下,就不適合用執行緒池隔離技術,而是採用訊號量隔離,後面文章會介紹。
因此我們可以放心使用Hystrix的執行緒隔離技術,來防止雪崩這種可怕的致命性線上故障。
hystrix的執行緒隔離技術除了執行緒池,還有另外一種方式:訊號量。
執行緒池和訊號量的區別
在《Hystrix執行緒隔離技術解析-執行緒池》一文最後,我們談到了執行緒池的缺點,當我們依賴的服務是極低延遲的,比如訪問記憶體快取,就沒有必要使用執行緒池的方式,那樣的話開銷得不償失,而是推薦使用訊號量這種方式。下面這張圖說明了執行緒池隔離和訊號量隔離的主要區別:執行緒池方式下業務請求執行緒和執行依賴的服務的執行緒不是同一個執行緒;訊號量方式下業務請求執行緒和執行依賴服務的執行緒是同一個執行緒
訊號量和執行緒池的區別.png
如何使用訊號量來隔離執行緒
將屬性execution.isolation.strategy設定為SEMAPHORE ,象這樣 ExecutionIsolationStrategy.SEMAPHORE,則Hystrix使用訊號量而不是預設的執行緒池來做隔離。
public class CommandUsingSemaphoreIsolation extends HystrixCommand {
private final int id;
public CommandUsingSemaphoreIsolation(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
// since we're doing work in the run() method that doesn't involve network traffic
// and executes very fast with low risk we choose SEMAPHORE isolation
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}
@Override
protected String run() {
// a real implementation would retrieve data from in memory data structure
// or some other similar non-network involved work
return "ValueFromHashMap_" + id;
}
}
總結
訊號量隔離的方式是限制了總的併發數,每一次請求過來,請求執行緒和呼叫依賴服務的執行緒是同一個執行緒,那麼如果不涉及遠端RPC呼叫(沒有網路開銷)則使用訊號量來隔離,更為輕量,開銷更小。
轉載自:https://www.cnblogs.com/xiaohanlin/p/8012561.html
間接轉載請註明出處: http://www.jianshu.com/p/df1525d58c20
參考資料:https://github.com/Netflix/Hystrix/wiki