1. 程式人生 > >使用Hystrix的外掛機制,解決在使用執行緒隔離時,threadlocal的傳遞問題

使用Hystrix的外掛機制,解決在使用執行緒隔離時,threadlocal的傳遞問題

# 背景 在我們的專案中,比較廣泛地使用了ThreadLocal,比如,在filter層,根據token,取到使用者資訊後,就會放到一個ThreadLocal變數中;在後續的業務處理中,就會直接從當前執行緒,來獲取該ThreadLocal變數,然後獲取到其中的使用者資訊,非常的方便。 但是,hystrix 這個元件一旦引入的話,如果使用執行緒隔離的方式,我們的業務邏輯就被分成了兩部分,如下: ```java public class SimpleHystrixCommand extends HystrixCommand { private TestService testService; public SimpleHystrixCommand(TestService testService) { super(setter()); this.testService = testService; } @Override protected String run() throws Exception { .... } ... } ``` 首先,我們定義了一個Command,這個Command,最終就會丟給hystrix的執行緒池中去執行。那,我們的controller層,會怎麼寫呢? ```java @RequestMapping("/") public String hystrixOrder () { SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal(); // 1 SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService); // 2 String res = simpleHystrixCommand.execute(); return res; } ``` * 上面的1處,new了一個HystrixCommand,這一步,還是在當前執行緒執行的; * 2處,在執行execute的過程中,最終就會把這個command,丟到執行緒池中,然後,command中的業務邏輯,就線上程池的執行緒中執行了。 所以,這中間,是有執行緒切換的,執行1時,當前執行緒裡的ThreadLocal資料,在執行業務方法的時候,執行緒變了,也就取不到ThreadLocal資料了。 # 思路及實現 ## 原始碼 如果沒時間,可以直接看原始碼: https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demo ##從setter入手 一開始,我的思路是,看看能不能把hystrix的預設執行緒池給換掉,因為構建HystrixCommand時,支援使用Setter的方式去配置。 如下: ```java com.netflix.hystrix.HystrixCommand.Setter final public static class Setter { // 1 protected final HystrixCommandGroupKey groupKey; // 2 protected HystrixCommandKey commandKey; // 3 protected HystrixThreadPoolKey threadPoolKey; // 4 protected HystrixCommandProperties.Setter commandPropertiesDefaults; // 5 protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults; } ``` * 1處,設定命令組 * 2處,設定命令的key * 3處,設定執行緒池的key;hystrix會根據這個key,在一個map中,來查詢對應的執行緒池,如果找不到,則建立一個,並放到map中。 ```java com.netflix.hystrix.HystrixThreadPool.Factory final static ConcurrentHashMap threadPools = new ConcurrentHashMap(); ``` * 4處,命令的相關屬性,包括是否降級,是否熔斷,是否允許請求合併,命令執行的最大超時時長,以及metric等實時統計資訊 * 5處,執行緒池的相關屬性,比如核心執行緒數,最大執行緒數,佇列長度等 怎麼樣,可以設定的屬性很多,是吧,但是,並沒有讓我們控制執行緒池的建立相關的,也沒辦法替換其預設執行緒池。 ok,那不用setter的方式,行不行呢? ##從構造器入手 HystrixCommand 的建構函式,看看能不能傳入自定義的執行緒池呢? ![](https://img2020.cnblogs.com/blog/519126/202005/519126-20200509163638978-1818536926.png) 經過我一開始不仔細的觀察,發現有一個建構函式可以傳入HystrixThreadPool,ok,就是它了。但是,後面仔細一看,竟然是 package許可權,我的子類,和HystrixCommand當然不是一個package下的,所以,訪問不了這個構造器。 雖然,可以使用反射,但是,咱們還是守規矩點好了,再看看有沒有其他入口。 ##尋找擴充套件口 仔細觀察下,看看執行緒池什麼時候建立的? 入口在下圖,每次new一個HystrixCommand,最終都會呼叫父類的建構函式: ![](https://img2020.cnblogs.com/blog/519126/202005/519126-20200509164203471-1398983695.png) 上圖所示處,initThreadPool裡面,會去建立執行緒池,需要注意的是,這裡的第一個實參,threadPool,是建構函式的第5個形參,目前來看,傳進來的都是null。為啥說這個,我們接著看: ```java private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { //1 get the default implementation of HystrixThreadPool return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } } ``` 上面我們說了,第一個實參,總是null,所以,會走這裡的1處。 ```java com.netflix.hystrix.HystrixThreadPool.Factory#getInstance static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { String key = threadPoolKey.name(); //1 this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } //2 if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { // 3 threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); } ``` * 1處,會查詢快取,就是前面說的,去map中,根據執行緒池的key,查詢對應的執行緒池 * 2處,沒找到,則進行建立 * 3處,new HystrixThreadPoolDefault,建立執行緒池 我們接著看3處: ```java public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { // 1 this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); // 2 HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); // 3 this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); // 4 this.threadPool = this.metrics.getThreadPool(); ... } ``` * 1處,獲取執行緒池的預設配置,這個就和我們前面說的那個Setter裡的類似 * 2處,從HystrixPlugins.getInstance()獲取一個HystrixConcurrencyStrategy型別的物件,儲存到區域性變數 concurrencyStrategy * 3處,初始化metrics,這裡的第二個引數,是concurrencyStrategy.getThreadPool來獲取的,這個操作,實際上就會去建立執行緒池。 ```java com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#getThreadPool public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); ... final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); ... // 1 return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } ``` 上面的1處,會去建立執行緒池。但是,這裡直接就是要了 jdk 的預設執行緒池類來建立,這還怎麼搞?型別都定死了。沒法擴充套件了。。。 ##發現hystrix的外掛機制 但是,回過頭來,又仔細看了看,這個getThreadPool 是 HystrixConcurrencyStrategy類的一個方法,這個方法也是個例項方法。 方法不能改,那,例項能換嗎?再看看前面的程式碼: ![](https://img2020.cnblogs.com/blog/519126/202005/519126-20200509165711823-375080568.png) ok,那接著分析: ```java public HystrixConcurrencyStrategy getConcurrencyStrategy() { if (concurrencyStrategy.get() == null) { //1 check for an implementation from Archaius first Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class); concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl); } return concurrencyStrategy.get(); } ``` 1處,根據這個類,獲取實現,感覺有點戲。 ```java private T getPluginImplementation(Class pluginClass) { // 1 T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties); if (p != null) return p; // 2 return findService(pluginClass, classLoader); } ``` * 1處,從一個動態屬性中獲取,後來經查,發現是如果集成了Netflix Archaius就可以動態獲取屬性,類似於一個配置中心 * 2處,如果前面沒找到,就是要 JDK 的SPI機制。 ```java private static T findService( Class spi, ClassLoader classLoader) throws ServiceConfigurationError { ServiceLoader sl = ServiceLoader.load(spi, classLoader); for (T s : sl) { if (s != null) return s; } return null; } ``` 那就好說了。SPI ,我們自定義一個實現,就可以替換掉預設的了,hystrix做的還是不錯,擴充套件性可以。 現在知道可以自定義HystrixConcurrencyStrategy了,那要怎麼自定義呢? 這個類,是個抽象類,大體有如下幾個方法: ```java getThreadPool getBlockingQueue(int maxQueueSize) Callable wrapCallable(Callable callable) getRequestVariable(final HystrixRequestVariableLifecycle rv) ``` 說是抽象類,但其實並沒有需要我們實現的方法,所有方法都有預設實現,我們只需要重寫需要覆蓋的方法即可。 我這裡,看重了第三個方法: ```java /** * Provides an opportunity to wrap/decorate a {@code Callable} before execution. *

* This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}). *

* Default Implementation

*

* Pass-thru that does no wrapping. * * @param callable * {@code Callable} to be executed via a {@link ThreadPoolExecutor} * @return {@code Callable} either as a pass-thru or wrapping the one given */ public Callable wrapCallable(Callable callable) { return callable; } ``` 方法註釋如上,我簡單說下,在執行前,提供一個機會,讓你去wrap這個callable,即最終要丟到執行緒池執行的那個callable。 我們可以wrap一下原有的callable,在執行前,把當前執行緒的threadlocal變數存下來,即為A,然後設定到callable裡面去;在callable執行的時候,就可以使用我們的A中的threadlocal來替換掉worker執行緒中的。 多說無益,這裡直接看程式碼: ```java // 0 public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { @Override public Callable wrapCallable(Callable callable) { /** * 1 獲取當前執行緒的threadlocalmap */ Object currentThreadlocalMap = getCurrentThreadlocalMap(); Callable finalCallable = new Callable() { // 2 private Object callerThreadlocalMap = currentThreadlocalMap; // 3 private Callable targetCallable = callable; @Override public T call() throws Exception { /** * 4 將工作執行緒的原有執行緒變數儲存起來 */ Object oldThreadlocalMapOfWorkThread = getCurrentThreadlocalMap(); /** *5 將本執行緒的執行緒變數,設定為caller的執行緒變數 */ setCurrentThreadlocalMap(callerThreadlocalMap); try { // 6 return targetCallable.call(); }finally { // 7 setCurrentThreadlocalMap(oldThreadlocalMapOfWorkThread); log.info("restore work thread's threadlocal"); } } }; return finalCallable; } ``` * 0處,自定義了一個類,繼承HystrixConcurrencyStrategy,準備覆蓋其預設的wrap方法 * 1處,獲取外部執行緒的threadlocal * 2處,3處,這裡已經是處於匿名內部類了,定義了2個field,分別存放1中的外部執行緒的threadlocal,以及要wrap的callable * 4處,此時已經處於run方法的執行邏輯了:儲存worker執行緒的自身的執行緒區域性變數 * 5處,使用外部執行緒的threadlocal覆蓋自身的 * 6處,呼叫真正的業務邏輯 * 7處,恢復為執行緒自身的threadlocal 獲取執行緒的threadlocal的程式碼: ```java private Object getCurrentThreadlocalMap() { Thread thread = Thread.currentThread(); try { Field field = Thread.class.getDeclaredField("threadLocals"); field.setAccessible(true); Object o = field.get(thread); return o; } catch (NoSuchFieldException | IllegalAccessException e) { log.error("{}",e); } return null; } ``` 設定執行緒的threadlocal的程式碼: ```java private void setCurrentThreadlocalMap(Object newThreadLocalMap) { Thread thread = Thread.currentThread(); try { Field field = Thread.class.getDeclaredField("threadLocals"); field.setAccessible(true); field.set(thread,newThreadLocalMap); } catch (NoSuchFieldException | IllegalAccessException e) { log.error("{}",e); } } ``` ## 外掛機制的相關資料 https://github.com/Netflix/Hystrix/wiki/Plugins # 執行效果 ## controller程式碼 ```java @RequestMapping("/") public String hystrixOrder () { // 1 SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal(); // 2 SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService); String res = simpleHystrixCommand.execute(); return res; } ``` * 1處,設定ThreadLocal變數 ```java public static UserVO getSessionVOFromRedisAndPut2ThreadLocal() { UserVO userVO = new UserVO(); userVO.setUserName("test user"); RequestContextHolder.set(userVO); log.info("set thread local:{} to context",userVO); return userVO; } ``` * 2處,new了一個HystrixCommand,然後execute執行 ##command中程式碼 ```java public class SimpleHystrixCommand extends HystrixCommand { private TestService testService; public SimpleHystrixCommand(TestService testService) { super(setter()); this.testService = testService; } @Override protected String run() throws Exception { // 1 String s = testService.getResult(); log.info("get thread local:{}",s); /** * 如果睡眠時間,超過2s,會降級 * {@link #getFallback()} */ int millis = new Random().nextInt(3000); log.info("will sleep {} millis",millis); Thread.sleep(millis); return s; } ``` 重點看1處程式碼: ```java public String getResult() { UserVO userVO = RequestContextHolder.get(); log.info("I am hystrix pool thread,try to get threadlocal:{}",userVO); return userVO.toString(); } ``` 如上所示,會去獲取ThreadLocal變數,並列印。 ## spi配置 在resources\META-INF\services目錄下,建立檔案: com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy 內容為下面一行: com.learn.hystrix.utils.MyHystrixConcurrencyStrategy ## 執行效果 訪問:http://localhost:8080/ ```java 2020-05-09 17:26:11.134 INFO 7452 --- [nio-8080-exec-2] com.learn.hystrix.utils.SessionUtils : set thread local:UserVO(userName=test user) to context 2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] com.learn.hystrix.service.TestService : I am hystrix pool thread,try to get threadlocal:UserVO(userName=test user) 2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : get thread local:UserVO(userName=test user) 2020-05-09 17:26:11.144 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : will sleep 126 millis 2020-05-09 17:26:11.281 INFO 7452 --- [x-member-pool-2] c.l.h.u.MyHystrixConcurrencyStrategy : restore work thread's threadlocal ``` 可以看到,已經發生了執行緒切換,在worker執行緒也取到了。 大家如果發現日誌中出現了[ HystrixTimer-1] 執行緒的身影,不用擔心,那只是因為我們的執行緒超時了,所以timer執行緒檢測到了之後,去執行一個callable任務,那個runnable就是前面被我們包裝過的那個callable。(這塊超時的機制,todo吧,下次再講) # 總結 hystrix的外掛機制,不止可以擴充套件上面這一個類,還有幾個別的類也是可以的。大家直接參考: https://github.com/Netflix/Hystrix/wiki/Plugins 程式碼demo,我放在了: https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-loc