1. 程式人生 > >Spring Cloud中Hystrix 執行緒隔離導致ThreadLocal資料丟失

Spring Cloud中Hystrix 執行緒隔離導致ThreadLocal資料丟失

在Spring Cloud中我們用Hystrix來實現斷路器,Zuul中預設是用訊號量(Hystrix預設是執行緒)來進行隔離的,我們可以通過配置使用執行緒方式隔離。

在使用執行緒隔離的時候,有個問題是必須要解決的,那就是在某些業務場景下通過ThreadLocal來線上程裡傳遞資料,用訊號量是沒問題的,從請求進來,但後續的流程都是通一個執行緒。

當隔離模式為執行緒時,Hystrix會將請求放入Hystrix的執行緒池中去執行,這個時候某個請求就有A執行緒變成B執行緒了,ThreadLocal必然消失了。

下面我們通過一個簡單的列子來模擬下這個流程:

public class CustomThreadLocal {
    static ThreadLocal<String> threadLocal = new ThreadLocal<>();
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                CustomThreadLocal.threadLocal.set("猿天地");
                new Service().call();
            }
        }).start();
    }
}
class Service {
    public void call() {
        System.out.println("Service:" + Thread.currentThread().getName());
        System.out.println("Service:" + CustomThreadLocal.threadLocal.get());
        new Dao().call();
    }
}
class Dao {
    public void call() {
        System.out.println("==========================");
        System.out.println("Dao:" + Thread.currentThread().getName());
        System.out.println("Dao:" + CustomThreadLocal.threadLocal.get());
    }
}

我們在主類中定義了一個ThreadLocal用來傳遞資料,然後起了一個執行緒,線上程中呼叫Service中的call方法,並且往Threadlocal中設定了一個值,在Service中獲取ThreadLocal中的值,然後再呼叫Dao中的call方法,也是獲取ThreadLocal中的值,我們執行下看效果:

Service:Thread-0
Service:猿天地
==========================
Dao:Thread-0
Dao:猿天地

可以看到整個流程都是在同一個執行緒中執行的,也正確的獲取到了ThreadLocal中的值,這種情況是沒有問題的。

接下來我們改造下程式,進行執行緒切換,將呼叫Dao中的call重啟一個執行緒執行:

public class CustomThreadLocal {
    static ThreadLocal<String> threadLocal = new ThreadLocal<>();
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                CustomThreadLocal.threadLocal.set("猿天地");
                new Service().call();
            }
        }).start();
    }
}
class Service {
    public void call() {
        System.out.println("Service:" + Thread.currentThread().getName());
        System.out.println("Service:" + CustomThreadLocal.threadLocal.get());
        //new Dao().call();
        new Thread(new Runnable() {
            @Override
            public void run() {
                new Dao().call();
            }
        }).start();
    }
}
class Dao {
    public void call() {
        System.out.println("==========================");
        System.out.println("Dao:" + Thread.currentThread().getName());
        System.out.println("Dao:" + CustomThreadLocal.threadLocal.get());
    }
}

再次執行,看效果:

Service:Thread-0
Service:猿天地
==========================
Dao:Thread-1
Dao:null

可以看到這次的請求是由2個執行緒共同完成的,在Service中還是可以拿到ThreadLocal的值,到了Dao中就拿不到了,因為執行緒已經切換了,這就是開始講的ThreadLocal的資料會丟失的問題。

那麼怎麼解決這個問題呢,其實也很簡單,只需要改一行程式碼即可:

static ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();

將ThreadLocal改成InheritableThreadLocal,我們看下改造之後的效果:

Service:Thread-0
Service:猿天地
==========================
Dao:Thread-1
Dao:猿天地

值可以正常拿到,InheritableThreadLocal就是為了解決這種執行緒切換導致ThreadLocal拿不到值的問題而產生的。

要理解InheritableThreadLocal的原理,得先理解ThreadLocal的原理,我們稍微簡單的來介紹下ThreadLocal的原理:

  • 每個執行緒都有一個 ThreadLocalMap 型別的 threadLocals 屬性,ThreadLocalMap 類相當於一個Map,key 是 ThreadLocal 本身,value 就是我們設定的值。
public class Thread implements Runnable {
    ThreadLocal.ThreadLocalMap threadLocals = null;
}
  • 當我們通過 threadLocal.set(“猿天地”); 的時候,就是在這個執行緒中的 threadLocals 屬性中放入一個鍵值對,key 是 當前執行緒,value 就是你設定的值猿天地。
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}
  • 當我們通過 threadlocal.get() 方法的時候,就是根據當前執行緒作為key來獲取這個執行緒設定的值。
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
             @SuppressWarnings("unchecked")
             T result = (T)e.value;
             return result;
        }
    }
    return setInitialValue();
}

通過上面的介紹我們可以瞭解到threadlocal能夠傳遞資料是用Thread.currentThread()當前執行緒來獲取,也就是隻要在相同的執行緒中就可以獲取到前方設定進去的值。

如果在threadlocal設定完值之後,下步的操作重新建立了一個執行緒,這個時候Thread.currentThread()就已經變了,那麼肯定是拿不到之前設定的值。具體的問題復現可以參考上面我的程式碼。

那為什麼InheritableThreadLocal就可以呢?

InheritableThreadLocal這個類繼承了ThreadLocal,重寫了3個方法,在當前執行緒上建立一個新的執行緒例項Thread時,會把這些執行緒變數從當前執行緒傳遞給新的執行緒例項。

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    /**
     * Computes the child's initial value for this inheritable thread-local
     * variable as a function of the parent's value at the time the child
     * thread is created.  This method is called from within the parent
     * thread before the child is started.
     * <p>
     * This method merely returns its input argument, and should be overridden
     * if a different behavior is desired.
     *
     * @param parentValue the parent thread's value
     * @return the child thread's initial value
     */
    protected T childValue(T parentValue) {
        return parentValue;
    }
    /**
     * Get the map associated with a ThreadLocal.
     *
     * @param t the current thread
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }
    /**
     * Create the map associated with a ThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the table.
     */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

通過上面的程式碼我們可以看到InheritableThreadLocal 重寫了childValue, getMap,createMap三個方法,當我們往裡面set值的時候,值儲存到了inheritableThreadLocals裡面,而不是之前的threadLocals。

關鍵的點來了,為什麼當建立新的執行緒池,可以獲取到上個執行緒裡的threadLocal中的值呢?原因就是在新建立執行緒的時候,會把之前執行緒的inheritableThreadLocals賦值給新執行緒的inheritableThreadLocals,通過這種方式實現了資料的傳遞。

原始碼最開始在Thread的init方法中,如下:

if (parent.inheritableThreadLocals != null)
    this.inheritableThreadLocals =
               ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

createInheritedMap如下:

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
    return new ThreadLocalMap(parentMap);
}

賦值程式碼:

private ThreadLocalMap(ThreadLocalMap parentMap) {
      Entry[] parentTable = parentMap.table;
      int len = parentTable.length;
      setThreshold(len);
      table = new Entry[len];
      for (int j = 0; j < len; j++) {
            Entry e = parentTable[j];
            if (e != null) {
                @SuppressWarnings("unchecked")
                ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                if (key != null) {
                    Object value = key.childValue(e.value);
                    Entry c = new Entry(key, value);
                    int h = key.threadLocalHashCode & (len - 1);
                    while (table[h] != null)
                        h = nextIndex(h, len);
                    table[h] = c;
                    size++;
                }
            }
        }
}

到此為止,通過inheritableThreadLocals我們可以在父執行緒建立子執行緒的時候將Local中的值傳遞給子執行緒,這個特性已經能夠滿足大部分的需求了,但是還有一個很嚴重的問題是如果是線上程複用的情況下就會出問題,比如執行緒池中去使用inheritableThreadLocals 進行傳值,因為inheritableThreadLocals 只是會再新建立執行緒的時候進行傳值,執行緒複用並不會做這個操作,那麼要解決這個問題就得自己去擴充套件執行緒類,實現這個功能。

不要忘記我們是做Java的哈,開源的世界有你需要的任何東西,下面我給大家推薦一個實現好了的Java庫,是阿里開源的transmittable-thread-local。

GitHub地址:https://github.com/alibaba/transmittable-thread-local

主要功能就是解決在使用執行緒池等會快取執行緒的元件情況下,提供ThreadLocal值的傳遞功能,解決非同步執行時上下文傳遞的問題。

JDK的InheritableThreadLocal類可以完成父執行緒到子執行緒的值傳遞。但對於使用執行緒池等會快取執行緒的元件的情況,執行緒由執行緒池建立好,並且執行緒是快取起來反覆使用的;這時父子執行緒關係的ThreadLocal值傳遞已經沒有意義,應用需要的實際上是把 任務提交給執行緒池時的ThreadLocal值傳遞到任務執行時。

transmittable-thread-local使用方式分為三種,修飾Runnable和Callable,修飾執行緒池,Java Agent來修飾JDK執行緒池實現類

接下來給大家演示下執行緒池的修飾方式,首先來一個非正常的案例,程式碼如下:

public class CustomThreadLocal {
    static ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
    static ExecutorService pool = Executors.newFixedThreadPool(2);
    public static void main(String[] args) {
        for(int i=0;i<100;i++) {
             int j = i;
            pool.execute(new Thread(new Runnable() {
                @Override
                public void run() {
                    CustomThreadLocal.threadLocal.set("猿天地"+j);
                    new Service().call();
                }
            }));
        }
    }
}
class Service {
    public void call() {
        CustomThreadLocal.pool.execute(new Runnable() {
            @Override
            public void run() {
                new Dao().call();
            }
        });
    }
}
class Dao {
    public void call() {
         System.out.println("Dao:" + CustomThreadLocal.threadLocal.get());
    }
}

執行上面的程式碼出現的結果是不正確的,輸出結果如下:

Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99

正確的應該是從1到100,由於執行緒的複用,值被替換掉了才會出現不正確的結果

接下來使用transmittable-thread-local來改造有問題的程式碼,新增transmittable-thread-local的Maven依賴:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.2.0</version>
</dependency>

只需要修改2個地方,修飾執行緒池和替換InheritableThreadLocal:

static TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
static ExecutorService pool =  TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));

正確的結果如下:

Dao:猿天地85
Dao:猿天地84
Dao:猿天地86
Dao:猿天地87
Dao:猿天地88
Dao:猿天地90
Dao:猿天地89
Dao:猿天地91
Dao:猿天地93
Dao:猿天地92
Dao:猿天地94
Dao:猿天地95
Dao:猿天地97
Dao:猿天地96
Dao:猿天地98
Dao:猿天地99

到這裡我們就已經可以完美的解決執行緒中,執行緒池中ThreadLocal資料的傳遞了,各位看官又疑惑了,標題不是講的Spring Cloud中如何解決這個問題麼,我也是在Zuul中發現這個問題的,解決方案已經告訴大家了,至於怎麼解決Zuul中的這個問題就需要大家自己去思考了,後面有時間我再分享給大家。

/*
 * Copyright 2002-2012 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.web.context.request;

import javax.faces.context.FacesContext;

import org.springframework.core.NamedInheritableThreadLocal;
import org.springframework.core.NamedThreadLocal;
import org.springframework.util.ClassUtils;

/**
 * Holder class to expose the web request in the form of a thread-bound
 * {@link RequestAttributes} object. The request will be inherited
 * by any child threads spawned by the current thread if the
 * {@code inheritable} flag is set to {@code true}.
 *
 * <p>Use {@link RequestContextListener} or
 * {@link org.springframework.web.filter.RequestContextFilter} to expose
 * the current web request. Note that
 * {@link org.springframework.web.servlet.DispatcherServlet} and
 * {@link org.springframework.web.portlet.DispatcherPortlet} already
 * expose the current request by default.
 *
 * @author Juergen Hoeller
 * @author Rod Johnson
 * @since 2.0
 * @see RequestContextListener
 * @see org.springframework.web.filter.RequestContextFilter
 * @see org.springframework.web.servlet.DispatcherServlet
 * @see org.springframework.web.portlet.DispatcherPortlet
 */
public abstract class RequestContextHolder  {

	private static final boolean jsfPresent =
			ClassUtils.isPresent("javax.faces.context.FacesContext", RequestContextHolder.class.getClassLoader());

	private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
			new NamedThreadLocal<RequestAttributes>("Request attributes");

	private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
			new NamedInheritableThreadLocal<RequestAttributes>("Request context");


	/**
	 * Reset the RequestAttributes for the current thread.
	 */
	public static void resetRequestAttributes() {
		requestAttributesHolder.remove();
		inheritableRequestAttributesHolder.remove();
	}

	/**
	 * Bind the given RequestAttributes to the current thread,
	 * <i>not</i> exposing it as inheritable for child threads.
	 * @param attributes the RequestAttributes to expose
	 * @see #setRequestAttributes(RequestAttributes, boolean)
	 */
	public static void setRequestAttributes(RequestAttributes attributes) {
		setRequestAttributes(attributes, false);
	}

	/**
	 * Bind the given RequestAttributes to the current thread.
	 * @param attributes the RequestAttributes to expose,
	 * or {@code null} to reset the thread-bound context
	 * @param inheritable whether to expose the RequestAttributes as inheritable
	 * for child threads (using an {@link InheritableThreadLocal})
	 */
	public static void setRequestAttributes(RequestAttributes attributes, boolean inheritable) {
		if (attributes == null) {
			resetRequestAttributes();
		}
		else {
			if (inheritable) {
				inheritableRequestAttributesHolder.set(attributes);
				requestAttributesHolder.remove();
			}
			else {
				requestAttributesHolder.set(attributes);
				inheritableRequestAttributesHolder.remove();
			}
		}
	}

	/**
	 * Return the RequestAttributes currently bound to the thread.
	 * @return the RequestAttributes currently bound to the thread,
	 * or {@code null} if none bound
	 */
	public static RequestAttributes getRequestAttributes() {
		RequestAttributes attributes = requestAttributesHolder.get();
		if (attributes == null) {
			attributes = inheritableRequestAttributesHolder.get();
		}
		return attributes;
	}

	/**
	 * Return the RequestAttributes currently bound to the thread.
	 * <p>Exposes the previously bound RequestAttributes instance, if any.
	 * Falls back to the current JSF FacesContext, if any.
	 * @return the RequestAttributes currently bound to the thread
	 * @throws IllegalStateException if no RequestAttributes object
	 * is bound to the current thread
	 * @see #setRequestAttributes
	 * @see ServletRequestAttributes
	 * @see FacesRequestAttributes
	 * @see javax.faces.context.FacesContext#getCurrentInstance()
	 */
	public static RequestAttributes currentRequestAttributes() throws IllegalStateException {
		RequestAttributes attributes = getRequestAttributes();
		if (attributes == null) {
			if (jsfPresent) {
				attributes = FacesRequestAttributesFactory.getFacesRequestAttributes();
			}
			if (attributes == null) {
				throw new IllegalStateException("No thread-bound request found: " +
						"Are you referring to request attributes outside of an actual web request, " +
						"or processing a request outside of the originally receiving thread? " +
						"If you are actually operating within a web request and still receive this message, " +
						"your code is probably running outside of DispatcherServlet/DispatcherPortlet: " +
						"In this case, use RequestContextListener or RequestContextFilter to expose the current request.");
			}
		}
		return attributes;
	}


	/**
	 * Inner class to avoid hard-coded JSF dependency.
 	 */
	private static class FacesRequestAttributesFactory {

		public static RequestAttributes getFacesRequestAttributes() {
			FacesContext facesContext = FacesContext.getCurrentInstance();
			return (facesContext != null ? new FacesRequestAttributes(facesContext) : null);
		}
	}

}

RequestContextHolder.setRequestAttribut