1. 程式人生 > >java 並發 (四) ---- 並發容器

java 並發 (四) ---- 並發容器

submit 實時數據 tle mit 讀下 nth 類目 兩種 list

Hashmap 和 Concurrenthashmap

Hashmap 不適合並發,應該使用ConcurrentHashMap .

這是很多人都知道的,但是為什麽呢? 可以先看一下這兩篇文章. JDK7與JDK8中HashMap的實現 和 談談HashMap線程不安全的體現.

由這兩篇文章我們可以知道 :

  • Hashmap 不適合並發的原因是當Hashmap擴容的時候,遷移會產生回環.
  • Hashmap 在JDK1.7 解決沖突的方法是生成鏈表,而1.8是生成紅黑樹.

明白了Hashmap之後,我們來看一下 ConcurrentHashMap 的實現是怎麽樣的? 漫畫:什麽是ConcurrentHashMap的? 我們可以總結一下 ConcurrentHashMap的幾個要點 : (ccHp縮寫 ConcurrentHashMap)

  • ccHp 的實現是 分段鎖,而不是整個對象鎖住,增強了並發性. 每一段是一個 segment
  • ccHp的size() 方法 ,即是容器中的元素個數.統計數量的邏輯如下 :

1.遍歷所有的Segment。

2.把Segment的元素數量累加起來。

3.把Segment的修改次數累加起來。

4.判斷所有Segment的總修改次數是否大於上一次的總修改次數。如果大於,說明統計過程中有修改,重新統計,嘗試次數+1;如果不是。說明沒有修改,統計結束。

5.如果嘗試次數超過閾值,則對每一個Segment加鎖,再重新統計。

6.再次判斷所有Segment的總修改次數是否大於上一次的總修改次數。由於已經加鎖,次數一定和上次相等。

7.釋放鎖,統計結束。

可以看到ccHp 統計size 時判斷是否有沒被修改和 CAS 相似.

ccHp的運用可以適合並發,在web上例如session的管理,下面是shiro session 管理類.(shiro開源可以好好學習)

public class MemorySessionDAO extends AbstractSessionDAO {

    private static final Logger log = LoggerFactory.getLogger(MemorySessionDAO.class);

    private ConcurrentMap<Serializable, Session> sessions;

    
public MemorySessionDAO() { this.sessions = new ConcurrentHashMap<Serializable, Session>(); } protected Serializable doCreate(Session session) { Serializable sessionId = generateSessionId(session); assignSessionId(session, sessionId); storeSession(sessionId, session); return sessionId; } protected Session storeSession(Serializable id, Session session) { if (id == null) { throw new NullPointerException("id argument cannot be null."); } return sessions.putIfAbsent(id, session); } protected Session doReadSession(Serializable sessionId) { return sessions.get(sessionId); } public void update(Session session) throws UnknownSessionException { storeSession(session.getId(), session); } public void delete(Session session) { if (session == null) { throw new NullPointerException("session argument cannot be null."); } Serializable id = session.getId(); if (id != null) { sessions.remove(id); } } public Collection<Session> getActiveSessions() { Collection<Session> values = sessions.values(); if (CollectionUtils.isEmpty(values)) { return Collections.emptySet(); } else { return Collections.unmodifiableCollection(values); } } }

CopyOnWriteArrayList 和 CopyOnWriteArraySet

下文縮寫CopyOnWriteArrayList 為 cowaList. cowaList 是為了替代同步 List, cowaSet 同理為了替代同步Set的. 閱讀下面進行了解原理先. CopyOnWriteArrayList實現原理及源碼分析 .

原理一圖得已了解.(圖片來源見參考資料)

技術分享圖片

圖一

由此我們可以總結一下 ccowaList 的幾個重要點 :

  • ccowaList 適合 多讀少寫 因為讀是沒加鎖的,增加元素時先復制一份,即寫是在副本上,而讀是原始容器中實現了讀寫分離.
  • 缺點 --- 要是寫多的話,每次的復制會是性能問題 ; 無法實時數據,這是因為讀寫分離了.CopyOnWrite容器只能保證數據的最終一致性,不能保證數據的實時一致性。

運用場景和缺點分析, 詳細的看這裏 Java並發編程:並發容器之CopyOnWriteArrayList(轉載)

CopyOnWrite並發容器用於讀多寫少的並發場景。比如白名單,黑名單,商品類目的訪問和更新場景,假如我們有一個搜索網站,用戶在這個網站的搜索框中,輸入關鍵字搜索內容,但是某些關鍵字不允許被搜索。這些不能被搜索的關鍵字會被放在一個黑名單當中,黑名單每天晚上更新一次。當用戶搜索時,會檢查當前關鍵字在不在黑名單當中.

同步工具類

CountDownLatch

下文簡稱cdl. 首先cdl定義一個count, 這個count表示一個有多少個執行任務,需要等待幾個執行,然後cdl開啟await()並且阻塞 ,然後每個線程執行完任務,調用countDown()方法,個count-1 ,直到全部任務完成,cdl繼續執行. 詳見 什麽時候使用CountDownLatch技術分享圖片

使用例子如下 :

public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        Thread thread1 = new Thread(() -> {
            try {
                System.out.println("線程1  開始執行" + new Date());
                Thread.sleep(1000 * 3);
                System.out.println("線程1  執行完畢"+ new Date());
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });


        Thread thread2 = new Thread(() -> {
            try {
                System.out.println("線程2 開始執行 " + new Date());
                Thread.sleep(2 * 1000);
                System.out.println("線程2 執行結束 " + new Date());
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });


        thread1.start();
        thread2.start();
        latch.await();
        System.out.println("任務全部完成");

    }

FutureTask

下文簡稱ft. 那麽ft的作用到底是幹什麽的呢?具體來說就是可以返回線程執行的結果,可以獲取線程執行的狀態,可以中斷線執行的類. 具體使用見 : Java並發編程:Callable、Future和FutureTask

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主線程在執行任務");
         
        try {
            System.out.println("task運行結果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任務執行完畢");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子線程在進行計算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

信號量

Semaphore 這個類就像一個停車場的保安,停車場的車位是固定的,獲取信號量就是進入停車場停車,而釋放信號量就是離開停車場.

Semaphore 分為兩種模式,假如假如車位滿了,當有車出來時,那麽公平的方式就是在場外的車先到先進,不公平的方式就是無論先來晚來

的一起競爭. 詳見這兩篇文章 : Semaphore的工作原理及實例 和 深入理解Semaphore

Semaphore有兩種模式,公平模式和非公平模式。

公平模式就是調用acquire的順序就是獲取許可證的順序,遵循FIFO;

而非公平模式是搶占式的,也就是有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程。

示例代碼來自 Semaphore的工作原理及實例

public class SemaphoreDemo {
    private static final Semaphore semaphore=new Semaphore(3);
    private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
    
    private static class InformationThread extends Thread{
        private final String name;
        private final int age;
        public InformationThread(String name,int age)
        {
            this.name=name;
            this.age=age;
        }
        
        public void run()
        {
            try
            {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"歲當前時間為:"+System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println(name+"要準備釋放許可證了,當前時間為:"+System.currentTimeMillis());
                System.out.println("當前可使用的許可數為:"+semaphore.availablePermits());
                semaphore.release();
                
            }
            catch(InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args)
    {
        String[] name= {"李明","王五","張傑","王強","趙二","李四","張三"};
        int[] age= {26,27,33,45,19,23,41};
        for(int i=0;i<7;i++)
        {
            Thread t1=new InformationThread(name[i],age[i]);
            threadPool.execute(t1);
        }
    }
 

}

可以看到要是沒有許可的話,調用acquire 方法就會一直阻塞.

柵欄

柵欄能阻塞一組線程直到某個事件發生,柵欄與閉鎖的區別的關鍵區別在於,所有線程必須同時到達柵欄位置,才能繼續執行.而閉鎖是等待一組線程完成後,某個操作才可以進行.可以這樣比喻吧,例如有場賽馬表演,當所有的馬到達終點後會放飛和平鴿.那麽當所有馬都準備好,即是條件滿足了,所有的馬奔騰而出,就是CyclicBarrier ,而當所有的馬都到達終點了,這麽條件滿足了,和平鴿被放飛了,這個相當於是CountDownLatch.

static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                System.out.println("線程1 進入cyclicBarrier.await() " + new Date());
                c.await();
            } catch (Exception e) {

            }
            System.out.println("線程1 柵欄打開 " + new Date());
            System.out.println(1);
        }).start();

        try {
            System.out.println("主線程 進入cyclicBarrier.await() " + new Date());
            c.await();
        } catch (Exception e) {

        }
        System.out.println("主線程 柵欄打開 " + new Date());
        System.out.println(2);
    }
  1 public class Test {
  2     public static void main(String[] args) {
  3         int N = 4;
  4         CyclicBarrier barrier  = new CyclicBarrier(N);
  5 
  6         for(int i=0;i<N;i++) {
  7             new Writer(barrier).start();
  8         }
  9 
 10         try {
 11             Thread.sleep(25000);
 12         } catch (InterruptedException e) {
 13             e.printStackTrace();
 14         }
 15 
 16         System.out.println("CyclicBarrier重用");
 17 
 18         for(int i=0;i<N;i++) {
 19             new Writer(barrier).start();
 20         }
 21     }
 22     static class Writer extends Thread{
 23         private CyclicBarrier cyclicBarrier;
 24         public Writer(CyclicBarrier cyclicBarrier) {
 25             this.cyclicBarrier = cyclicBarrier;
 26         }
 27 
 28         @Override
 29         public void run() {
 30             System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據...");
 31             try {
 32                 Thread.sleep(5000);      //以睡眠來模擬寫入數據操作
 33                 System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其他線程寫入完畢");
 34 
 35                 cyclicBarrier.await();
 36             } catch (InterruptedException e) {
 37                 e.printStackTrace();
 38             }catch(BrokenBarrierException e){
 39                 e.printStackTrace();
 40             }
 41             System.out.println(Thread.currentThread().getName()+"所有線程寫入完畢,繼續處理其他任務...");
 42         }
 43     }
 44 }

第二個代碼展示了CyclicBarrier 執行的東西可以復用.

下面總結一下 CyclicBarrier 和 CountDownLatch 的區別

  • CountDownLatch 的信號量不能重新設置,CyclicBarrier 可以重新設置.
  • CyclicBarrier 可以復用, 而CountDownLatch不能復用.

Exchanger 像是交換東西的一個平臺.

  1    public static void main(String[] args) {
  2 
  3         ExecutorService executor = Executors.newCachedThreadPool();
  4         final Exchanger exchanger = new Exchanger();
  5 
  6         executor.execute(new Runnable() {
  7             String data1 = "Ling";
  8             @Override
  9             public void run() {
 10                 doExchangeWork(data1, exchanger);
 11             }
 12         });
 13 
 14         executor.execute(new Runnable() {
 15             String data1 = "huhx";
 16             @Override
 17             public void run() {
 18                 doExchangeWork(data1, exchanger);
 19             }
 20         });
 21 
 22         executor.shutdown();
 23     }
 24 
 25 
 26     private static void doExchangeWork(String data1, Exchanger exchanger) {
 27         try {
 28             System.out.println(Thread.currentThread().getName() + "正在把數據 " + data1 + " 交換出去");
 29             Thread.sleep((long) (Math.random() * 1000));
 30             //放進交換的位置.
 31             String data2 = (String) exchanger.exchange(data1);
 32             System.out.println(Thread.currentThread().getName() + "交換數據 到  " + data2);
 33         } catch (InterruptedException e) {
 34             e.printStackTrace();
 35         }
 36     }

參考資料 :

  • https://javadoop.com/post/java-memory-model
  • JDK7與JDK8中HashMap的實現
  • 談談HashMap線程不安全的體現
  • 漫畫:什麽是ConcurrentHashMap的?
  • https://www.cnblogs.com/leesf456/p/5547853.html
  • CopyOnWriteArrayList實現原理及源碼分析

java 並發 (四) ---- 並發容器