1. 程式人生 > >Java併發程式設計之ThreadLocal解析

Java併發程式設計之ThreadLocal解析

本文討論的是JDK 1.8中的ThreadLocal

ThreadLocal概念

ThreadLocal多執行緒間併發訪問變數的解決方案,為每個執行緒提供變數的副本,用空間換時間。

  • ThreadLocal在每個執行緒中對該變數會建立一個副本,即每個執行緒內部都會有一個該變數,且線上程內部任何地方都可以使用,執行緒之間互不影響,實現執行緒隔離,這樣一來就不存線上程安全問題,也不會嚴重影響程式執行效能
  • 由於在每個執行緒中都建立了副本,所以要考慮它對資源的消耗,比如記憶體的佔用會比不使用ThreadLocal要大
  • 如果使用ThreadLocal,通常定義為private static型別,在我看來最好是定義為private static final型別

ThreadLocal使用場景

個人認為只要滿足以下兩點需求,就可以考慮使用ThreadLocal

  • 每個執行緒需要有自己單獨的例項
  • 例項需要在多個方法中共享,但不希望被多執行緒共享

比如:建立資料庫連線,在多執行緒情況下,我們肯定不希望出現A執行緒拿到連線未執行完,B執行緒就把它關閉或多個執行緒共用一個連線導致資料操作混亂等情況。而我們正確的姿勢應該會擼上以下這樣的類似程式碼:

private static ThreadLocal<Connection> connTl = new ThreadLocal<>();

public static Connection getConnection() throws SQLException{
    Connection conn = connTl.get();
    if(conn==null){
        conn = dataSource.getConnection();
        connTl.set(conn);
    }
    return conn;
}

ThreadLocal常用方法介紹

class ThreadLocal<T> {
    T get();
    void set(T value);
    void remove();
}

設定當前執行緒的執行緒區域性變數的值

public void set(T value);

返回當前執行緒所對應的執行緒區域性變數

public T get();

刪除該執行緒當前執行緒區域性變數的值

public void remove()

ThreadLocal原始碼解析

在看常用方法原始碼前,我們要先了解下ThreadLocalMap

ThreadLocalMap是ThreadLocal內部的一個靜態類

 static class ThreadLocalMap {

        static class Entry extends WeakReference<ThreadLocal<?>> {
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

        /**
         * 初始容量
         */
        private static final int INITIAL_CAPACITY = 16;

        /** 
        * 實體表 
        */
        private Entry[] table;

        /**
         * 表初始大小
         */
        private int size = 0;

        /**
         * 擴容上限,當size到達threashold時,需要resize整個Map,threshold的初始值為len * 2 / 3
         */
        private int threshold; // Default to 0

        /**
         * 將調整大小閾值設定為最壞情況下保持2/3的負載因子。
         */
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }

        /**
         * 獲取下一個索引,超出長度則返回0
         */
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        /**
         * 返回上一個索引,如果-1為負數,返回長度-1的索引
         */
        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

        /**
         * 構造引數建立一個ThreadLocalMap程式碼
         * ThreadLocal為key,我們的泛型為value
         */
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

        /**
         * ThreadLocal本身是執行緒隔離的,按道理是不會出現資料共享和傳遞的行為的
         * 這是InheritableThreadLocal提供了了一種父子間資料共享的機制
         */
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];

            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }

        /**
         * 獲取ThreadLocal的索引位置,通過下標索引獲取內容
         */
        private Entry getEntry(ThreadLocal<?> key) {
            // 通過hashcode確定下標
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            // 如果找到則直接返回
            if (e != null && e.get() == key)
                return e;
            else
                // 找不到的話接著從i位置開始向後遍歷,基於線性探測法,是有可能在i之後的位置找到的
                return getEntryAfterMiss(key, i, e);
        }
        
        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            // 迴圈向後遍歷
            while (e != null) {
                // 獲取節點對應的k
                ThreadLocal<?> k = e.get();
                // 相等則返回
                if (k == key)
                    return e;
                // 如果為null,觸發一次連續段清理
                if (k == null)
                    expungeStaleEntry(i);
                // 獲取下一個下標接著進行判斷
                else
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }

        /**
         * ThreadLocalMap的set方法,通過這個方法,我們可以看出該雜湊表是用線性探測法來解決衝突的
         */
        private void set(ThreadLocal<?> key, Object value) {
            // 新開一個引用指向table
            Entry[] tab = table;
            // 獲取table的長度
            int len = tab.length;
             // 獲取對應ThreadLocal在table當中的下標
            int i = key.threadLocalHashCode & (len-1);

            // 從該下標開始迴圈遍歷
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();

                // 如遇相同key,則直接替換value
                if (k == key) {
                    e.value = value;
                    return;
                }

                // 如果該key已經被回收失效,則替換該失效的key
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }
            
            // 找到空的位置,建立Entry物件並插入
            tab[i] = new Entry(key, value);
            // table內元素size自增
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

        /**
         * 移除key方法
         */
        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);
                    return;
                }
            }
        }
    
        private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            // 建個引用變數指向table
            Entry[] tab = table;
            // 獲取table長度
            int len = tab.length;
            Entry e;

            // 記錄當前失效的節點下標
            int slotToExpunge = staleSlot;
            
            /**
             * 由staleSlot下標開始向前掃描,查詢並記錄最前位置value為null的下標
             */
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            /**
             * 由staleSlot下標開始向後掃描
             */
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                
                // 獲取Entry節點對應的ThreadLocal物件
                ThreadLocal<?> k = e.get();

                /**
                 * 如果與新的key對應,直接賦值value,替換i與staleSlot兩個下標
                 */
                if (k == key) {
                    e.value = value;

                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;                    

                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                /* 如果當前下標所在已經失效,並且向後掃描過程當中沒有找到失效的Entry節點,則slotToExpunge賦值為當前位置*/
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // 如果並沒有在table當中找到該key,則直接在當前位置new一個Entry
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }
        
        /**        
         * 核心清理函式,它主要做的事情就是
         * 1、從staleSlot開始,向後遍歷將ThreadLocal物件被回收所在Entry節點的value和Entry節點本身設定null,方便GC,並且size自減1
         * 2、會對非null的Entry節點進行rehash,只要不是在當前位置,就會將Entry挪到下一個為null的位置上
         * 所以實際上是對從staleSlot開始做一個連續段的清理和rehash操作
         */
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            //獲取長度
            int len = tab.length;

            // 將傳過來的下標置null
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            size--;
            
            Entry e;
            int i;
            //遍歷刪除指定節點所有後續節點當中,ThreadLocal被回收的節點
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                //獲取entry當中的key
                ThreadLocal<?> k = e.get();
                // 如果ThreadLocal為null,則將value以及陣列下標所在位置設定null,方便GC並且size-1
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {    //如果不為null
                    //重新計算key的下標
                    int h = k.threadLocalHashCode & (len - 1);
                    
                    // 如果是當前位置則遍歷下一個
                    // 不是當前位置,則重新從i開始找到下一個為null的座標進行賦值
                    if (h != i) {
                        tab[i] = null;
                        
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }

        /**
        * 清理被回收的Entry
        */
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                i = nextIndex(i, len);
                Entry e = tab[i];
                // Entry物件不為空,但是ThreadLocal這個key已經為null,則清除
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    // 呼叫清理函式
                    i = expungeStaleEntry(i);
                }
            } while ( (n >>>= 1) != 0);
            return removed;
        }

        /**
         * rehash操作
         */
        private void rehash() {
            expungeStaleEntries();

            // Use lower threshold for doubling to avoid hysteresis
            if (size >= threshold - threshold / 4)
                resize();
        }

        /**
         * 對table進行擴容,因為要保證table的長度是2的冪,所以擴容就擴大2倍
         */
        private void resize() {
            Entry[] oldTab = table;
            //舊錶長度
            int oldLen = oldTab.length;
            //新表長度
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;

            /**
             * 從下標0開始,逐個向後遍歷插入到新的table當中
             * 1、如遇到key已經為null,則value設定null,方便GC回收
             * 2、通過hashcode & len - 1計算下標,如果該位置已經有Entry陣列,則通過線性探測向後探測插入
             */
            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    if (k == null) {
                        e.value = null; // Help the GC
                    } else {
                        int h = k.threadLocalHashCode & (newLen - 1);
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }

            //重新設定擴容的閾值
            setThreshold(newLen);
            //更新size
            size = count;
            //指向新的Entry陣列
            table = newTab;
        }

        /**
         * 清除table中所有無用的entry
         */
        private void expungeStaleEntries() {
            Entry[] tab = table;
            int len = tab.length;
            for (int j = 0; j < len; j++) {
                Entry e = tab[j];
                if (e != null && e.get() == null)
                    expungeStaleEntry(j);
            }
        }
    }

get方法

    public T get() {
        // 獲取當前的Thread物件
        Thread t = Thread.currentThread();
        // 通過getMap獲取Thread內的ThreadLocalMap
        ThreadLocalMap map = getMap(t);      
        if (map != null) {
            // 如果map已經存在,以當前的ThreadLocal為鍵,獲取Entry物件,並從從Entry中取出值
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 如果map為空,則呼叫setInitialValue進行初始化
        return setInitialValue();
    }

getMap方法

    ThreadLocalMap getMap(Thread t) {
        //返回執行緒中ThreadLocalMap
        return t.threadLocals;
    }
    
    //Thread.java類threadLocals屬性
    ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocalMap引用其實是存在ThreadLocal類裡面的

Entry實體


    //Entry是一個key-value結構,key為ThreadLocal,value為儲存的值
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }

setInitialValue方法

setInitialValue在Map不存在的時候呼叫

    private T setInitialValue() {
        //呼叫initialValue生成一個初始的value值,深入initialValue函式,我們可知它就是返回一個null
        T value = initialValue();
        Thread t = Thread.currentThread();
        //獲取ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            //如果不存在則會呼叫createMap建立ThreadLocalMap
            createMap(t, value);
        return value;
    }
    
    void createMap(Thread t, T firstValue) {
        //new一個ThreadLocalMap物件進去
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

set方法

    public void set(T value) {
        //獲取當前執行緒
        Thread t = Thread.currentThread();
        //獲取map
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

map.set(this,value)方法

    private void set(ThreadLocal<?> key, Object value) {
    
        Entry[] tab = table;
        int len = tab.length;
        //根據key計算出位置
        int i = key.threadLocalHashCode & (len-1);

        //迴圈檢測
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();

            //如果Entry已經存在並且key等於傳入的key,那麼這時候直接給這個Entry賦新的value值。
            if (k == key) {
                e.value = value;
                return;
            }

            //如果Entry存在,但是key為null,則呼叫replaceStaleEntry來更換這個key為空的Entry
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
        }
        //建立一個entry
        tab[i] = new Entry(key, value);
        //sz加1
        int sz = ++size;
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }

remove方法

     public void remove() {
         //獲取map
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             //呼叫map的remove
             m.remove(this);
     }

ThreadLocalMap.remove(this)方法

     private void remove(ThreadLocal<?> key) {
         Entry[] tab = table;
         int len = tab.length;
         //獲取索引位置
         int i = key.threadLocalHashCode & (len-1);
         //迴圈遍歷table表
         for (Entry e = tab[i];
              e != null;
              e = tab[i = nextIndex(i, len)]) {
             //如果找到,調呼叫清除相關方法,並結束迴圈
             if (e.get() == key) {
                 //呼叫weakrefrence的clear()清除引用
                 e.clear();
                 //連續段清除
                 expungeStaleEntry(i);
                 return;
             }
         }
     }

資料

  • Java問題收集
  • ThreadLocal和ThreadLocalMap原始碼分析