1. 程式人生 > >?集合工具類使用線程

?集合工具類使用線程

集合 線程

集合工具類使用線程

1. hashmap源碼解析與並發可能遇見的問題

1.HashMap中的幾個重要變量

static final int DEFAULT_INITIAL_CAPACITY = 16;
    //默認初始容量,必須是2的n次方
static final int MAXIMUM_CAPACITY = 1 << 30;
    //最大容量,當通過構造方法傳入的容量比它還大時,就用這個最大容量,必須是2的n次方
static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //默認負載因子
transient Entry<K,V>[] table;
    //用來存儲鍵值對,可以看到鍵值對都是存儲在Entry中的
transient int size;
    //存放元素的個數
  
int threshold; 
    //臨界值   當實際大小超過臨界值時,會進行擴容threshold = 加載因子*容量
 
final float loadFactor; 
    //加載因子
  
transient int modCount;
    //被修改的次數


存儲結構

技術分享圖片技術分享圖片

2.Entry是一個鏈表結構,不僅包含keyvalue,還有可以指向下一個的next

static class Entry<K,V> implements Map.Entry<K,V> {
        final K key;
        V value;
        Entry<K,V> next;
        int hash;
        /**
         * Creates new entry.
         */
        Entry(int h, K k, V v, Entry<K,V> n) {
            value = v;
            next = n;
            key = k;
            hash = h;
        }
        ...
//3.put方法
public V put(K key, V value) {
        if (key == null)
            return putForNullKey(value);//儲存空鍵
        int hash = hash(key);//計算hash值
        int i = indexFor(hash, table.length);//計算存儲位置
        for (Entry<K,V> e = table[i]; e != null; e = e.next) {//遍歷hashmap的內部數據
            Object k;
            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }
//這個for循環,當發生並發,兩個線程沖突的時候,這個鏈表的結構會發生變化:可能兩個key互為對方的next元素。此時通過next遍歷,會形成死循環。在jdb8中已經不存在了。最好的解決辦法是使用concurrenthashmap
        modCount++;
        addEntry(hash, key, value, i);
        return null;
    }
//首先通過hash方法對hashcode進行處理:
final int hash(Object k) {
        int h = 0;
        h ^= k.hashCode();
        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }
//可以看到只是在key的hashcode值上做了一些處理,通過hash計算出來的值將會使用indexFor方法找到它應該所在的table下標:
static int indexFor(int h, int length) {
        return h & (length-1);
    }
//這個方法其實相當於對table.length取模。
//當需要插入的key為null時,調用putForNullKey方法處理:
 private V putForNullKey(V value) {
        for (Entry<K,V> e = table[0]; e != null; e = e.next) {
            if (e.key == null) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }
        modCount++;
        addEntry(0, null, value, 0);
        return null;
    }
//putForNullKey方法只從table[0]這個位置開始遍歷,因為key為null只放在table中的第一個位置,下標為0,在遍歷中如果發現已經有key為null了,則替換新value,返回舊value,結束;如果還沒有key為null,調用addEntry方法增加一個Entry:
void addEntry(int hash, K key, V value, int bucketIndex) {
        if ((size >= threshold) && (null != table[bucketIndex])) {
            resize(2 * table.length);
            hash = (null != key) ? hash(key) : 0;
            bucketIndex = indexFor(hash, table.length);
        }
        createEntry(hash, key, value, bucketIndex);
    }
//可以看到jdk7中resize的條件已經發生改變了,只有當 size>=threshold並且 table中的那個槽中已經有Entry時,才會發生resize。即有可能雖然size>=threshold,但是必須等到每個槽都至少有一個Entry時,才會擴容。還有註意每次resize都會擴大一倍容量
void createEntry(int hash, K key, V value, int bucketIndex) {
        Entry<K,V> e = table[bucketIndex];
        table[bucketIndex] = new Entry<>(hash, key, value, e);
        size++;
    }
//最後看createEntry,它先保存這個桶中的第一個Entry,創建新的Entry放入第一個位置,將原來的Entry接在後面。這裏采用的是頭插法插入元素。
4.get方法
//其實get方法和put方法如出一轍,怎麽放的怎麽拿
public V get(Object key) {
        if (key == null)
            return getForNullKey();
        Entry<K,V> entry = getEntry(key);
        return null == entry ? null : entry.getValue();
    }
//key為null時,還是去table[0]去取:
private V getForNullKey() {
        for (Entry<K,V> e = table[0]; e != null; e = e.next) {
            if (e.key == null)
                return e.value;
        }
        return null;
    }
//否則調用getEntry方法:
final Entry<K,V> getEntry(Object key) {
        int hash = (key == null) ? 0 : hash(key);
        for (Entry<K,V> e = table[indexFor(hash, table.length)];
             e != null;
             e = e.next) {
            Object k;
            if (e.hash == hash &&
                ((k = e.key) == key || (key != null && key.equals(k))))
                return e;
        }
        return null;
    }
//這個方法也是通過key的hashcode計算出它應該所在的下標,再遍歷這個下標的Entry鏈,如果key的內存地址相等(即同一個引用)或者equals相等,則說明找到了


hash的原則

A、等冪性。不管執行多少次獲取Hash值的操作,只要對象不變,那麽Hash值是固定的。如果第一次取跟第N次取不一樣,那就用起來很麻煩.

B、對等性。若兩個對象equal方法返回為true,則其hash值也應該是一樣的。舉例說明:若你將objA作為key存入HashMap中,然後new了一個objB。在你看來objB和objA是一個東西(因為他們equal),但是使用objB到hashMap中卻取不出來東西。

C、互異性。若兩個對象equal方法返回為false,hash值有可能相同,但最好是不同的,這個不是必須的,只是這樣做會提高hash類操作的性能(碰撞幾率低)。

解決hash碰撞的方法:

開放地址法
鏈地址法

hashmap采用的就是鏈地址法,這種方法好處是無堆積現象,但是next指針會占用額外空間

和jdk8中的HashMap區別

在jdk8中,仍然會根據key.hashCode()計算出hash值,再通過這個hash值去定位這個key,但是不同的是,當發生沖突時,會采用鏈表和紅黑樹兩種方法去處理,當結點個數較少時用鏈表(用Node存儲)個數較多時用紅黑樹(用TreeNode存儲),同時結點也不叫Entry了,而是分成了Node和TreeNode。再最壞的情況下,鏈表查找的時間復雜度為O(n),而紅黑樹一直是O(logn),這樣會提高HashMap的效率。
jdk8中的HashMap中定義了一個變量TREEIFY_THRESHOLD,當節點個數>= TREEIFY_THRESHOLD - 1時,HashMap將采用紅黑樹存儲

Put方法也變了,為了防止並發問題。

擴展:為何數組的長度是 2 的 n 次方呢?

1.這個方法非常巧妙,它通過 h & (table.length -1) 來得到該對象的保存位,而HashMap 底層數組的長度總是 2 的 n 次方,2n-1 得到的二進制數的每個位上的值都為 1,那麽與全部為 1 的一個數進行與操作速度會大大提升。

2.當 length 總是 2 的 n 次方時,h& (length-1)運算等價於對 length 取模,也就是h%length,但是&比%具有更高的效率。

3.當數組長度為 2 的 n 次冪的時候,不同的 key 算得的 index 相同的幾率較小,那麽數據在數組上分布就比較均勻,也就是說碰撞的幾率小,相對的,查詢的時候就不用遍歷某個位置上的鏈表,這樣查詢效率也就較高了。

HashMap 的擴容機制:

當 HashMap 中的結點個數超過數組大小*loadFactor(加載因子)時,就會進行數組擴容,loadFactor 的默認值為 0.75。也就是說,默認情況下,數組大小為 16,那麽當 HashMap中結點個數超過 16*0.75=12 的時候,就把數組的大小擴展為 2*16=32,即擴大一倍,然後重新計算每個元素在數組中的位置,並放進去,而這是一個非常消耗性能的操作


Hashmap和HashTable的異同:

01.兩者的默認容量與負載因子有變化

02.hashtable的 容量可以是任意值,而hashmap必須是2的次冪

03.hashtable中在put方法裏面不允許值與鍵為空

04.計算索引的方式不同(indexof函數不同)

05.hashtable大部分方法都加上了sychronied關鍵字

06.hashtable每次擴容,容量為原來的兩倍加2.

2. concurrenthashmap源碼解析與並發編程

使用與獲取全局信息的方法並不頻繁的時候

01.在 ConcurrentHashMap 中,不允許用 null 作為鍵和值。

02.ConcurrentHashMap 使用分段鎖(減少鎖粒度)技術,將數據分成一段一段的存儲,然後給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問,能夠實現真正的並發訪問。

默認情況下分為16個段。

技術分享圖片

03.當增加一個新的表項,不是全部加鎖,會先計算在哪個段,對指定的段加鎖。

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        int j = (hash >>> segmentShift) & segmentMask;
//上面兩行用於獲取段號
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);//得到段,將數據插入到段中
        return s.put(key, hash, value, false);
    }


04.當系統需要取得全局鎖,消耗資源就會比較多。比如size()方法:事實上會先使用無鎖的方式求和,如果失敗,會先獲得所有段的鎖再去求和。

技術分享圖片

3. BlockingQueue

A線程可以知道b線程的存在

是一個接口並非一個具體實現:

ArrayBlockingQueue

ArrayBlockingQueue的內部元素都放置在一個對象數組中:final Object[] items;

Offer():當隊列已經滿了,會立即返回false

Put():如果隊列滿了會一直等待

Pool():彈出元素,如果為空返回null

Take():彈出元素,如果為空等待到有元素即可。

Take方法:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
    
private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }


Put方法:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
/**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

LinkedBlockingQueue(鎖分離)

兩把不同的鎖
/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
 
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
 
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
 
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
 
Take函數
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//不能有兩個線程同時取數據
        try {
            while (count.get() == 0) {//如果沒有數據,一直等待(因為是lockInterruptibly,可中斷)
                notEmpty.await();
            }
            x = dequeue();//取得第一個數據
            c = count.getAndDecrement();//數量-1,原子操作,因為會和put同時訪問count。
            if (c > 1)
                notEmpty.signal();//通知其他take操作
        } finally {
            takeLock.unlock();//釋放鎖
        }
        if (c == capacity)
            signalNotFull();//通知put操作,已有空余空間
        return x;
    }
 
Put函數
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//上鎖不能有兩個線程同時進行put函數
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {//當隊列已經滿了以後,等待
                notFull.await();
            }
            enqueue(node);//插入數據
            c = count.getAndIncrement();//更新總數
            if (c + 1 < capacity)
                notFull.signal();//有足夠的空間,通知其他線程
        } finally {
            putLock.unlock();//釋放鎖
        }
        if (c == 0)
            signalNotEmpty();//釋放成功後,通知take函數取數據
    }

4. 並發下的ArrayList

當ArrayList在擴容的時候,內部一致性被破壞,由於沒有鎖的保護,另外一個線程訪問不到不一致的內部狀態,導致出現越界問題。

還會出現多個線程同時對同一位置進行賦值。

5. concurrentlinkedqueue:

高並發環境中可以說是最好的隊列,也可以看做是一個線程安全的linkedList。

6. CopyOnWriteArrayList

性能很好的讀寫list,在讀寫的時候任何時候都不加鎖;只有在寫寫的時候需要同步等待。

當寫操作的時候,進行一次自我復制。對原有的數據進行一次復制,將修改的內容寫入副本修改完之後,將副本替換原來的數據。


?集合工具類使用線程