1. 程式人生 > >並發容器(一)同步容器 與 並發容器

並發容器(一)同步容器 與 並發容器

並發編程 hash表 city 實現 prior zab debug 一個個 tla

一、同步容器

同步容器包括兩類:

  • Vector、Hashtable、Stack
  • 同步的封裝器類由 Collections.synchronizedXXX 等工廠方法創建的。(JDK1.2加入)

??這些類實現線程安全的方式是:將他們的狀態封裝起來,並對每個公有方法都進行同步,使得每一次只有一個線程能訪問容器的狀態。 同步容器類的出現是為了解決 Collection、Map 不能同步,線程不安全的問題。

同步容器類的問題

??同步容器類都是線程安全的,但不是絕對的線程安全 (所謂線程安全僅僅是在每一個方法上加鎖,保持原子)。在某些情況下,需要額外加鎖來保護復合操作。復合類操作如:叠代(反復訪問元素,遍歷完容器中的所有元素)、跳轉

(根據指定的順序找到當前元素的下一個元素)、以及條件運算(如“若沒有則添加”)。這些復合操作在多線程並發的修改容器時,可能會表現出意外的行為。

看下面三種“意外”情況:

1. 獲取與刪除的復合操作

??下面的代碼看起來沒什麽問題,但如果一旦出現:線程A執行 getLast() 方法,線程B執行 deleteLast() 方法;線程A,線程B 交替執行,getLast() 方法就可能會拋出 ArrayIndexOutOfBoundsException(數組下標越界)。

 public static Object getLast(Vector list) {
            int lastIndex = list.size
() - 1; return list.get(lastIndex); } public static void deleteLast(Vector list) { int lastIndex = list.size() - 1; list.remove(lastIndex); }

??為防止這種情況出現,就要額外加鎖,使 getLast()、deleteLast() 方法成為原子性操作。正確的寫法如下:

public static Object getLast(Vector list){
      synchronized
(this){ int lastIndex = list.size() - 1; return list.get(lastIndex); } } public static void deleteLast(Vector list){ synchronized(this){ int lastIndex = list.size() - 1; list.remove(lastIndex); } }

2. 普通叠代

for(int i = 0 ;i < vector.size(); i++){
    doSome(vector.get(i));
}

??這種叠代方法的正確性完全依賴於運氣:我們無法保證在調用size與get直接按有沒有其他線程對所操作的這個Vector進行了修改。但是這並不代表Vector就不是線程安全的。Vector仍然是線程安全的,而拋出的異常也與其規範保持一致。和 getLast()的例子一樣,如果 遍歷列表的線程 與 刪除的線程 交替執行,同樣也會拋出 ArrayIndexOutOfBoundsException。

改進的寫法:

synchronized(vector){
for(int i = 0 ;i < vector.size(); i++){
    doSome(vector.get(i));
    }
}

3. 叠代器 Iterator 與 ConcurrentModificationException

??對容器的標準叠代方式是使用 Iterator。然而,在叠代的期間,如果有線程並發地修改同步容器的,那麽即使使用 Iterator 也無法避免對容器進行加鎖。這是由於早期叠代器設計的時候並沒有考慮並發修改的問題。而且,一旦失敗,將會拋出 ConcurrentModificationException .

  Collection c = Collections.synchronizedCollection(myCollection);
     ...
  synchronized(c) {
      Iterator i = c.iterator(); // Must be in the synchronized block
      while (i.hasNext())
         foo(i.next());
  }

??有時候程序員並不希望在叠代期間對容器加鎖。特別是容器叠代的規模大的時候,就可能需要長時間加鎖,會造成鎖的競爭激烈,降低程序的伸縮性。替代的方法是,克隆出一個副本,在副本上叠代。但也要進行權衡,因為克隆復制也需要額外的開銷。

隱藏的叠代器

??容器的有些方法是進行叠代的,這些方法也要記得對其加鎖。我們看看下面這個程序:

public class HiddenIterator{
    @GuardedBy(this)
    private final Set<Integer> set = new HashSet<Integer>();
    
    public synchronized void add(Integer i ){ set.add(i); }
    public synchronized void remove(Integer i ){ set.remove(i); }
    
    public void addTenThings(){
        Random r = new Random();
        for(int i = 0 ; i < 10; i++)
            add(r.nextInt());
        System.out.println("DEBUG : added ten elements to" + set);
    }
}

??上面的程序看起來好像也沒問題,add、remove都加鎖了。然而 addTenThings() 的最一行輸出中,調用了 Set.toString()方法,toString()方法又是對容器進行了叠代,也可能拋出 ConcurrentModificationException 。所以,也要對 addTenThings()方法加鎖。

??隱藏了叠代容器操作的方法: toString()、hashcode()、equals()、containsAll()、removeAll()、retainAll()。還有 forEach 的寫法。

二、並發容器類

??同步類容器的狀態都是串行化的(使用 synchronized 加鎖的,同一時間只能一個線程訪問容器,一個個排隊訪問,這就是串行化)。他們雖然實現了線程安全,但是嚴重降低了並發性,在多線程環境時,嚴重降低了應用的吞吐量。
看一下源代碼,更加直觀:
下面是 Collections.SynchronizedCollection() 方法的源代碼。

public static <T> Collection<T> synchronizedCollection(Collection<T> c) {
        return new SynchronizedCollection<>(c);
    }

synchronizedCollection()方法是直接創建並返回一個 SynchronizedCollection 類的對象,這個類是 Collections 的靜態內部類,繼續跟蹤。

static class SynchronizedCollection<E> implements Collection<E>, Serializable {
        private static final long serialVersionUID = 3053995032091335093L;

        final Collection<E> c;  // 非線程安全的 Collection
        final Object mutex;     // Object on which to synchronize

        SynchronizedCollection(Collection<E> c) {
            //判斷集合c是否是為null,為null就拋異常
            this.c = Objects.requireNonNull(c);
            mutex = this;
        }

        SynchronizedCollection(Collection<E> c, Object mutex) {
            //判斷集合c是否是為null,為null就拋異常
            this.c = Objects.requireNonNull(c);
            this.mutex = Objects.requireNonNull(mutex);
        }

      //封裝集合c的size()方法,synchronized修飾,變成同步方法
        public int size() {
            synchronized (mutex) {return c.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return c.isEmpty();}
        }
        public boolean contains(Object o) {
            synchronized (mutex) {return c.contains(o);}
        }
        public Object[] toArray() {
            synchronized (mutex) {return c.toArray();}
        }
        //........

??可以看出,Collections.SynchronizedCollection類其實就是封裝了 非線程安全的Collection 類對象,在 Collection 的每個方法上加上 synchronized。

再看一下 Vector add()方法的源代碼:

 public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }

??從上面的源碼可以得知:同步容器的線程安全都是用 synchronized 來實現的,而且鎖住整個方法區,即方法區的所有代碼都是臨界區,這就導致了同一時刻,只能有一個線程訪問容器。換句話說,只能同步地訪問容器,無法並發地訪問容器,在高並發的情況下,將會非常地糟糕。

這時候,高性能的並發容器出現了

??java5.0之後提供了多種並發容器來改善同步容器的性能,如 ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue;其中 ConcurrentHashMap 用來替代 Hashtable ,CopyOnWriteArrayList 用來替代 Vector;
??並發容器類采用各種優化手段,盡可能讓多線程並發訪問容器:ConcurrentHashMap 的分段鎖、ConcurrentLinkedQueue 的非阻塞的CAS算法、鎖的粒度更細、以及針對多讀少寫的情況下的 “寫時復制”。

下面重點說一下 ConcurrentHashMap

??ConcurrentHashMap 采用分段鎖技術 ,同步容器中,是一個容器一個鎖,但在ConcurrentHashMap中,會將hash表的數據分成若幹段,每段維護一個鎖,以達到高效的並發訪問;

??ConcurrentHashMap 與 其他並發容器一樣,在叠代的過程不需要加鎖,叠代器具有弱一致性,叠代期間不會拋出ConcurrentModificationException異常,並非“立即失敗”;所謂 弱一致性 ,就是返回的元素將反映叠代器創建時或創建後某一時刻的映射狀態。同時,需要在整個Map上進行計算的方法,如 size()、isEmpty(),這些方法的語義被略微減弱,以反映並發的特性,換句話說,這些方法的值是一個估計值,並不是很精確。事實上,這些方法在並發環境下用處很小,因為在並發的情況下,它們的返回值總是在變化。如果需要強一致性,那麽就得考慮加鎖。同步容器類便是強一致性的。

??由於 ConcurrentHashMap 不能被加鎖來執行獨占訪問,因此無法通過加鎖來創建新的原子操作。不過,ConcurrentHashMap 提供了以下幾個原子操作(由其父接口 ConcurrentMap 提供),基本滿足需求了:

//如果指定鍵已經不再與某個值相關聯,則將它與給定值關聯。
V putIfAbsent(K key, V value);

//只有目前將鍵的條目映射到給定值時,才移除該鍵的條目。
boolean remove(Object key, Object value);

//只有目前將鍵的條目映射到某一值時,才替換該鍵的條目。
V replace(K key, V value);

//只有目前將鍵的條目映射到給定值時,才替換該鍵的條目。
boolean replace(K key,V oldValue, V newValue);

JDK 提供的並發容器還包括以下7個阻塞隊列,如下:

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

補充說明:上面的 ConcurrentHashMap 的介紹是基於 JDK1.6 版本的,JDK1.8 有所修改,可參考後續文章。

參考文獻:

  • 《並發編程的藝術》
  • 《並發編程實戰》

並發容器(一)同步容器 與 並發容器