和朱曄一起復習Java併發(五):併發容器和同步器
本節我們先會來複習一下java.util.concurrent下面的一些併發容器,然後再會來簡單看一下各種同步器。
ConcurrentHashMap和ConcurrentSkipListMap的效能
首先,我們來測試一下ConcurrentHashMap和ConcurrentSkipListMap的效能。
前者對應的非併發版本是HashMap,後者是跳錶實現,Map按照Key順序排序(當然也可以提供一個Comparator進行排序)。
在這個例子裡,我們不是簡單的測試Map讀寫Key的效能,而是實現一個多執行緒環境下使用Map最最常見的場景:統計Key出現頻次,我們的Key的範圍是1萬個,然後迴圈1億次(也就是Value平均也在1萬左右),10個併發來操作Map:
@Slf4j public class ConcurrentMapTest { int loopCount = 100000000; int threadCount = 10; int itemCount = 10000; @Test public void test() throws InterruptedException { StopWatch stopWatch = new StopWatch(); stopWatch.start("hashmap"); normal(); stopWatch.stop(); stopWatch.start("concurrentHashMap"); concurrent(); stopWatch.stop(); stopWatch.start("concurrentSkipListMap"); concurrentSkipListMap(); stopWatch.stop(); log.info(stopWatch.prettyPrint()); } private void normal() throws InterruptedException { HashMap<String, Long> freqs = new HashMap<>(); ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(itemCount); synchronized (freqs) { if (freqs.containsKey(key)) { freqs.put(key, freqs.get(key) + 1); } else { freqs.put(key, 1L); } } } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); //log.debug("normal:{}", freqs); } private void concurrent() throws InterruptedException { ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(itemCount); ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(itemCount); freqs.computeIfAbsent(key, k -> new LongAdder()).increment(); } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); //log.debug("concurrentHashMap:{}", freqs); } private void concurrentSkipListMap() throws InterruptedException { ConcurrentSkipListMap<String, LongAdder> freqs = new ConcurrentSkipListMap<>(); ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(itemCount); freqs.computeIfAbsent(key, k -> new LongAdder()).increment(); } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); //log.debug("concurrentSkipListMap:{}", freqs); } }
這裡可以看到,這裡的三種實現:
- 對於normal的實現,我們全程鎖住了HashMap然後進行讀寫
- 對於ConcurrentHashMap,我們巧妙利用了一個computeIfAbsent()方法,實現了判斷Key是否存在,計算獲取Value,put Key Value三步操作,得到一個Value是LongAdder(),然後因為LongAdder是執行緒安全的所以直接呼叫了increase()方法,一行程式碼實現了5行程式碼效果
- ConcurrentSkipListMap也是一樣
執行結果如下:
可以看到我們利用ConcurrentHashMap巧妙實現的併發詞頻統計功能,其效能相比有鎖的版本高了太多。
如果我們列印一下ConcurrentSkipListMap最後的結果,差不多是這樣的:
可以看到Entry按照了Key進行排序。
ConcurrentHashMap的那些原子操作方法
這一節我們比較一下computeIfAbsent()和putIfAbsent()的區別,這2個方法很容易因為誤用導致一些Bug。
- 第一個是效能上的區別,如果Key存在的話,computeIfAbsent因為傳入的是一個函式,函式壓根就不會執行,而putIfAbsent需要直接傳值。所以如果要獲得Value代價很大的話,computeIfAbsent效能會好
- 第二個是使用上的區別,computeIfAbsent返回是的是操作後的值,如果之前值不存在的話就返回計算後的值,如果本來就存在那麼就返回本來存在的值,putIfAbsent返回的是之前的值,如果原來值不存在那麼會得到null
寫一個程式來驗證一下:
@Slf4j
public class PutIfAbsentTest {
@Test
public void test() {
ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
log.info("Start");
log.info("putIfAbsent:{}", concurrentHashMap.putIfAbsent("test1", getValue()));
log.info("computeIfAbsent:{}", concurrentHashMap.computeIfAbsent("test1", k -> getValue()));
log.info("putIfAbsent again:{}", concurrentHashMap.putIfAbsent("test2", getValue()));
log.info("computeIfAbsent again:{}", concurrentHashMap.computeIfAbsent("test2", k -> getValue()));
}
private String getValue() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return UUID.randomUUID().toString();
}
}
在這裡獲取值的操作需要1s,從執行結果可以看到,第二次值已經存在的時候,putIfAbsent還耗時1s,而computeIfAbsent不是,而且還可以看到第一次值不存在的時候putIfAbsent返回了null,而computeIfAbsent返回了計算後的值:
使用的時候一定需要根據自己的需求來使用合適的方法。
ThreadLocalRandom的誤用
之前的例子裡我們用到了ThreadLocalRandom,這裡簡單提一下ThreadLocalRandom可能的誤用:
@Slf4j
public class ThreadLocalRandomMisuse {
@Test
public void test() throws InterruptedException {
ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
IntStream.rangeClosed(1, 5)
.mapToObj(i -> new Thread(() -> log.info("wrong:{}", threadLocalRandom.nextInt())))
.forEach(Thread::start);
IntStream.rangeClosed(1, 5)
.mapToObj(i -> new Thread(() -> log.info("ok:{}", ThreadLocalRandom.current().nextInt())))
.forEach(Thread::start);
TimeUnit.SECONDS.sleep(1);
}
}
一句話而言,我們應該每次都ThreadLocalRandom.current().nextInt()這樣用而不是例項化了ThreadLocalRandom.current()每次呼叫nextInt()。觀察一下兩次輸出可以發現,wrong的那5次得到的隨機數都是一樣的:
ConcurrentHashMap的併發reduce功能測試
ConcurrentHashMap提供了比較高階的一些方法可以進行併發的歸併操作,我們寫一段程式比較一下使用遍歷方式以及使用reduceEntriesToLong()統計ConcurrentHashMap中所有值的平均數的效能和寫法上的差異:
@Slf4j
public class ConcurrentHashMapReduceTest {
int loopCount = 100;
int itemCount = 10000000;
@Test
public void test() {
ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, itemCount)
.boxed()
.collect(Collectors.toMap(i -> "item" + i, Function.identity(),(o1, o2) -> o1, ConcurrentHashMap::new));
StopWatch stopWatch = new StopWatch();
stopWatch.start("normal");
normal(concurrentHashMap);
stopWatch.stop();
stopWatch.start("concurrent with parallelismThreshold=1");
concurrent(concurrentHashMap, 1);
stopWatch.stop();
stopWatch.start("concurrent with parallelismThreshold=max long");
concurrent(concurrentHashMap, Long.MAX_VALUE);
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
private void normal(ConcurrentHashMap<String, Long> map) {
IntStream.rangeClosed(1, loopCount).forEach(__ -> {
long sum = 0L;
for (Map.Entry<String, Long> item : map.entrySet()) {
sum += item.getValue();
}
double average = sum / map.size();
Assert.assertEquals(itemCount / 2, average, 0);
});
}
private void concurrent(ConcurrentHashMap<String, Long> map, long parallelismThreshold) {
IntStream.rangeClosed(1, loopCount).forEach(__ -> {
double average = map.reduceEntriesToLong(parallelismThreshold, Map.Entry::getValue, 0, Long::sum) / map.size();
Assert.assertEquals(itemCount / 2, average, 0);
});
}
}
執行結果如下:
可以看到並行歸併操作對於比較大的HashMap效能好不少,注意一點是傳入的parallelismThreshold不是並行度(不是ForkJoinPool(int parallelism)的那個parallelism)的意思,而是並行元素的閾值,傳入Long.MAX_VALUE取消並行,傳入1充分利用ForkJoinPool。
當然,我們這裡只演示了reduceEntriesToLong()一個方法,ConcurrentHashMap還有十幾種各種reduceXXX()用於對Key、Value和Entry進行並行歸併操作。
ConcurrentHashMap的誤用
其實這裡想說的之前的文章中也提到過,ConcurrentHashMap不能確保多個針對Map的操作是原子性的(除非是之前提到computeIfAbsent()和putIfAbsent()等等),比如在下面的例子裡,我們有一個9990大小的ConcurrentHashMap,有多個執行緒在計算它離10000滿員還有多少差距,然後填充差距:
@Test
public void test() throws InterruptedException {
int limit = 10000;
ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, limit - 10)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
log.info("init size:{}", concurrentHashMap.size());
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int __ = 0; __ < 10; __++) {
executorService.execute(() -> {
int gap = limit - concurrentHashMap.size();
log.debug("gap:{}", gap);
concurrentHashMap.putAll(LongStream.rangeClosed(1, gap)
.boxed()
.collect(Collectors.toMap(i -> UUID.randomUUID().toString(), Function.identity())));
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
}
這段程式碼顯然是有問題的:
- 第一,諸如size()、containsValue()等(聚合狀態的)方法僅僅在沒有併發更新的時候是準確的,否則只能作為統計、監控來使用,不能用於控制程式執行邏輯
- 第二,即使size()是準確的,在計算出gap之後其它執行緒可能已經往裡面新增資料了,雖然putAll()操作這一操作是執行緒安全的,但是這個這個計算gap,填補gap的邏輯並不是原子性的,不是說用了ConcurrentHashMap就不需要鎖了
輸出結果如下:
可以看到,有一些執行緒甚至計算出了負數的gap,最後結果是10040,比預期的limit多了40。
還有一點算不上誤用,只是提一下,ConcurrentHashMap的Key/Value不能是null,而HashMap是可以的,為什麼是這樣呢?
下圖是ConcurrentHashMap作者的回覆:
意思就是如果get(key)返回了null,你搞不清楚這到底是key沒有呢還是value就是null。非併發情況下你可以使用後contains(key)來判斷,但是併發情況下不行,你判斷的時候可能Map已經修改了。
CopyOnWriteArrayList測試
CopyOnWrite的意義在於幾乎沒有什麼修改,而讀併發超級高的場景,如果有修改,我們重起爐灶複製一份,雖然代價很大,但是這樣能讓99.9%的併發讀實現無鎖,我們來試試其效能,先是寫的測試,我們比拼一下CopyOnWriteArrayList、手動鎖的ArrayList以及synchronizedList包裝過的ArrayList:
@Test
public void testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> arrayList = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("arrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
synchronized (arrayList) {
arrayList.add(ThreadLocalRandom.current().nextInt(loopCount));
}
});
stopWatch.stop();
stopWatch.start("synchronizedList");
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
10萬次操作不算多,結果如下:
可見CopyOnWriteArrayList的修改因為涉及到整個資料的複製,代價相當大。
再來看看讀,先使用一個方法來進行1000萬資料填充,然後測試,迭代1億次:
private void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 10000000).boxed().collect(Collectors.toList()));
}
@Test
public void testRead() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> arrayList = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
addAll(copyOnWriteArrayList);
addAll(arrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 100000000;
int count = arrayList.size();
stopWatch.start("copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("arrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
synchronized (arrayList) {
arrayList.get(ThreadLocalRandom.current().nextInt(count));
}
});
stopWatch.stop();
stopWatch.start("synchronizedList");
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
執行結果如下:
的確沒錯,CopyOnWriteArrayList效能相當強悍,畢竟讀取無鎖,想多少併發就多少併發。
看完了大部分的併發容器我們再來看看五種併發同步器。
CountDownLatch測試
CountDownLatch在之前的文章中已經出現過N次了,也是五種併發同步器中使用最最頻繁的一種,一般常見的應用場景有:
- 等待N個執行緒執行完畢
- 就像之前很多次效能測試例子,使用兩個CountDownLatch,一個用來讓所有執行緒等待主執行緒發起命令一起開啟,一個用來給主執行緒等待所有子執行緒執行完畢
- 非同步操作的非同步轉同步,很多基於非同步網路通訊(比如Netty)的RPC框架都使用了CountDownLatch來非同步轉同步,比如下面取自RocketMQ中Remoting模組的原始碼片段:
來看看ResponseFuture的相關程式碼實現:
public class ResponseFuture {
private final int opaque;
private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final SemaphoreReleaseOnlyOnce once;
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
...
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
...
}
在發出網路請求後,我們等待響應,在收到響應後我們把資料放入後解鎖CountDownLatch,然後等待響應的請求就可以繼續拿資料。
Semaphore測試
Semaphore可以用來限制併發,假設我們有一個遊戲需要限制同時線上的玩家,我們先來定義一個Player類,在這裡我們通過傳入的Semaphore限制進入玩家的數量。
在程式碼裡,我們通過了之前學習到的AtomicInteger、AtomicLong和LongAdder來統計玩家的總數,最長等待時間和宗等待時長。
@Slf4j
public class Player implements Runnable {
private static AtomicInteger totalPlayer = new AtomicInteger();
private static AtomicLong longestWait = new AtomicLong();
private static LongAdder totalWait = new LongAdder();
private String playerName;
private Semaphore semaphore;
private LocalDateTime enterTime;
public Player(String playerName, Semaphore semaphore) {
this.playerName = playerName;
this.semaphore = semaphore;
}
public static void result() {
log.info("totalPlayer:{},longestWait:{}ms,averageWait:{}ms", totalPlayer.get(), longestWait.get(), totalWait.doubleValue() / totalPlayer.get());
}
@Override
public void run() {
try {
enterTime = LocalDateTime.now();
semaphore.acquire();
totalPlayer.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
long ms = Duration.between(enterTime, LocalDateTime.now()).toMillis();
longestWait.accumulateAndGet(ms, Math::max);
totalWait.add(ms);
//log.debug("Player:{} finished, took:{}ms", playerName, ms);
}
}
}
主測試程式碼如下:
@Test
public void test() throws InterruptedException {
Semaphore semaphore = new Semaphore(10, false);
ExecutorService threadPool = Executors.newFixedThreadPool(100);
IntStream.rangeClosed(1, 10000).forEach(i -> threadPool.execute(new Player("Player" + i, semaphore)));
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
Player.result();
}
我們限制併發玩家數量為10個,非公平進入,執行緒池是100個固定執行緒,總共有10000個玩家需要進行遊戲,程式結束後輸出如下:
再來試試公平模式:
可以明顯看到,開啟公平模式後最長等待的那個玩家沒有等那麼久了,平均等待時間比之前略長,符合預期。
CyclicBarrier測試
CyclicBarrier用來讓所有執行緒彼此等待,等待所有的執行緒或者說參與方一起到達了匯合點後一起進入下一次等待,不斷迴圈。在所有執行緒到達了匯合點後可以由最後一個到達的執行緒做一下『後處理』操作,這個後處理操作可以在宣告CyclicBarrier的時候傳入,也可以通過判斷await()的返回來實現。
這個例子我們實現一個簡單的場景,一個演出需要等待3位演員到位才能開始表演,演出需要進行3次。我們通過CyclicBarrier來實現等到所有演員到位,到位後我們的演出需要2秒時間。
@Slf4j
public class CyclicBarrierTest {
@Test
public void test() throws InterruptedException {
int playerCount = 5;
int playCount = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(playerCount);
List<Thread> threads = IntStream.rangeClosed(1, playerCount).mapToObj(player->new Thread(()-> IntStream.rangeClosed(1, playCount).forEach(play->{
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100));
log.debug("Player {} arrived for play {}", player, play);
if (cyclicBarrier.await() ==0) {
log.info("Total players {} arrived, let's play {}", cyclicBarrier.getParties(),play);
TimeUnit.SECONDS.sleep(2);
log.info("Play {} finished",play);
}
} catch (Exception e) {
e.printStackTrace();
}
}))).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
}
}
通過if (cyclicBarrier.await() ==0)可以實現在最後一個演員到位後做衝破柵欄後的後處理操作,我們看下這個演出是不是迴圈了3次,並且是不是所有演員到位後才開始的:
10:35:43.333 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 1
10:35:43.333 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 1
10:35:43.333 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 1
10:35:43.367 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 1
10:35:43.376 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 1
10:35:43.377 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 1
10:35:43.378 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 2
10:35:43.432 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 2
10:35:43.434 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 2
10:35:43.473 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 2
10:35:45.382 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 1 finished
10:35:45.390 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 2
10:35:45.390 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 2
10:35:45.437 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 3
10:35:45.443 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 3
10:35:45.445 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 3
10:35:45.467 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 3
10:35:47.395 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 2 finished
10:35:47.472 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 3
10:35:47.473 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 3
10:35:49.477 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 3 finished
從這個例子可以看到,我們的演出是在最後到達的Player1演員這個執行緒上進行的,值得注意的一點是,在他表演的時候其他演員已經又進入了等待狀態(不要誤認為,CyclicBarrier會讓所有執行緒阻塞,等待後處理完成後再讓其它執行緒繼續下一次迴圈),就等他表演結束後繼續來到await()才能又開始新的演出。
Phaser測試
Phaser和Barrier類似,只不過前者更靈活,參與方的人數是可以動態控制的,而不是一開始先確定的。Phaser可以手動通過register()方法註冊成為一個參與方,然後通過arriveAndAwaitAdvance()表示自己已經到達,等到其它參與方一起到達後衝破柵欄。
比如下面的程式碼,我們對所有傳入的任務進行iterations次迭代操作。
Phaser終止的條件是大於迭代次數或者沒有參與方,onAdvance()返回true表示終止。
我們首先讓主執行緒成為一個參與方,然後讓每一個任務也成為參與方,在新的執行緒中執行任務,執行完成後到達柵欄,只要柵欄沒有終止則無限迴圈。
在主執行緒上我們同樣也是無限迴圈,在所有其它執行緒都衝破柵欄之後,主執行緒做一些收尾工作,比如進行資料統計,工作完成後到達柵欄,這樣其它執行緒又能進行下一次迭代。
@Slf4j
public class PhaserTest {
AtomicInteger atomicInteger = new AtomicInteger();
@Test
public void test() throws InterruptedException {
int iterations = 10;
int tasks = 100;
runTasks(IntStream.rangeClosed(1, tasks)
.mapToObj(index -> new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.incrementAndGet();
}))
.collect(Collectors.toList()), iterations);
Assert.assertEquals(tasks * iterations, atomicInteger.get());
}
private void runTasks(List<Runnable> tasks, int iterations) {
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations - 1 || registeredParties == 0;
}
};
phaser.register();
for (Runnable task : tasks) {
phaser.register();
new Thread(() -> {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
while (!phaser.isTerminated()) {
doPostOperation(phaser);
phaser.arriveAndAwaitAdvance();
}
doPostOperation(phaser);
}
private void doPostOperation(Phaser phaser){
if (phaser.getPhase()>0 || phaser.isTerminated()) {
log.info("phase:{},registered:{},unarrived:{},arrived:{},result:{}",
phaser.getPhase(),
phaser.getRegisteredParties(),
phaser.getUnarrivedParties(),
phaser.getArrivedParties(), atomicInteger.get());
}
}
}
10次迭代,每次迭代100個任務,執行一下看看:
可以看到,主執行緒會從第二次迭代開始做一些後處理工作,這個時候它肯定是第一個到達柵欄的,前一次柵欄衝破後其它執行緒已經把計數器累加了100,最後一次主執行緒做後處理是柵欄已經終止了的時候,這個時候看到arrived是101,這個101是100個任務執行緒加上1個主執行緒也就是自己。
Exchanger測試
Exchanger實現的效果是兩個執行緒在同一時間(會合點)交換資料,寫一段程式碼測試一下。在下面的程式碼裡,我們定義一個生產者執行緒不斷髮送資料,傳送資料後休眠時間隨機,通過使用Exchanger,消費者執行緒實現了在生產者傳送資料後立刻拿到資料的效果,在這裡我們並沒有使用阻塞佇列來實現:
@Slf4j
public class ExchangerTest {
@Test
public void test() throws InterruptedException {
Random random = new Random();
Exchanger<Integer> exchanger = new Exchanger<>();
int count = 10;
Executors.newFixedThreadPool(1, new ThreadFactoryImpl("producer"))
.execute(() -> {
try {
for (int i = 0; i < count; i++) {
log.info("sent:{}", i);
exchanger.exchange(i);
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactoryImpl("consumer"));
executorService.execute(() -> {
try {
for (int i = 0; i < count; i++) {
int data = exchanger.exchange(null);
log.info("got:{}", data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
}
}
執行效果如下:
小結
併發容器這塊我就不做過多總結了,ConcurrentHashMap實在是太好用太常用,但是務必注意其執行緒安全的特性並不是說ConcurrentHashMap怎麼用都沒有問題,錯誤使用在業務程式碼中很常見。
現在我們來舉個看錶演的例子總結一下幾種併發同步器:
- Semaphore是限制同時看錶演的觀眾人數,有人走了後新人才能進來看
- CountDownLatch是演職人員人不到齊表演無法開始,演完結束
- CyclicBarrier是演職人員到期了後才能表演,最後一個到的人是導演,導演會主導整個演出,演出完畢後所有演職人員修整後重新等待大家到期
- Phaser是每一場演出的演職人員名單可能隨時會更改,但是也是要確保所有演職人員到期後才能開演
同樣,程式碼見我的Github,歡迎clone後自己把玩,歡迎點贊。
歡迎關注我的微信公眾號:隨緣主人的園子
相關推薦
和朱曄一起復習Java併發(一):執行緒池
和我之前的Spring系列文章一樣,我們會以做一些Demo做實驗的方式來複習一些知識點。 本文我們先從Java併發中最最常用的執行緒池開始。 從一個執行緒池實驗開始 首先我們寫一個方法來每秒一次定時輸出執行緒池的基本資訊: private void printStats(ThreadPoolExecutor
和朱曄一起復習Java併發(二):佇列
和朱曄一起復習Java併發(二):佇列 老樣子,我們還是從一些例子開始慢慢熟悉各種併發佇列。以看小說看故事的心態來學習不會顯得那麼枯燥而且更容易記憶深刻。 阻塞佇列的等待? 阻塞佇列最適合做的事情就是做為生產消費者的中間儲存,以抵抗生產者消費者速率不匹配的問題,不但是在速率不匹配的時候能夠有地方暫存任務,而且
和朱曄一起復習Java併發(三):鎖(含鎖效能測試)
這個專題我發現怎麼慢慢演化為效能測試了,遇到任何東西我就忍不住去測一把。本文我們會大概看一下各種鎖資料結構的簡單用法,順便也會來比拼一下效能。 各種併發鎖 首先,我們定一個抽象基類,用於各種鎖測試的一些公共程式碼: 我們需要使用鎖來保護counter和hashMap這2個資源 write欄位表示這個執行緒是
和朱曄一起復習Java併發(四):Atomic
本節我們來研究下併發包中的Atomic型別。 AtomicXXX和XXXAdder以及XXXAccumulator效能測試 先來一把效能測試,對比一下AtomicLong(1.5出來的)、LongAdder(1.8出來的)和LongAccumulator(1.8出來的)用於簡單累加的效能。 程式邏輯比較簡單,
和朱曄一起復習Java併發(五):併發容器和同步器
本節我們先會來複習一下java.util.concurrent下面的一些併發容器,然後再會來簡單看一下各種同步器。 ConcurrentHashMap和ConcurrentSkipListMap的效能 首先,我們來測試一下ConcurrentHashMap和ConcurrentSkipListMap的效能。
java基礎(五):for迴圈和一維陣列
import java.util.Random; import java.util.Scanner; class DouDiZhu {public static void main(String[] args) {//1. 生成54張牌 -- 不需要引數,返回值String[]String[] pic =
大資料之JAVA基礎(五):迴圈和陣列方法練習
案例1:編寫 1+3+5+7+......+99的值 /* * 求1-99的基數和 */ public static void fun01() { int i = 1; int sum = 0; for(;i<100;i+=2) { sum += i;
javascript學習筆記(五):異常捕獲和事件處理
log 類型 按鈕 輸入 button lan yellow logs 代碼 異常捕獲 Try{ 發生異常的代碼塊 }catch(err){ 異常信息處理 } 1 <!DOCTYPE html> 2 <html> 3 <head
Spring Boot(五):整合Redis和使用Redis實現快取共享
Redis(REmote DIctionary Server)是一個key-value儲存系統,是當下網際網路公司最常用的NoSQL資料庫之一。支援儲存的value型別有string、list、set、zset(sorted set --有序集合)和hash。Redis的資料
Servlet(五):請求轉發和重定向
請求轉發: 問題: 伺服器在接收到瀏覽器的請求後,僅僅使用一個 Servlet進行請求處理,會造成不同的Servlet邏輯程式碼 冗餘,Servlet的職責不明確。 解決: 使用請求轉發。 特點: 一次請求 位址列資訊不改變。 Request 物件作用域
【翻譯】CodeMix使用教程(五):構建管道和驗證
在CodeMix中構建管道和驗證 CodeMix利用通過CodeMix增強任務定義的構建管道來生成專案的單一權威構建。 使用構建管道,您可以使用外部工具進行構建和驗證,並直接在編輯器中檢視結果。 通過使用通常用於構建應用程式以進行部署的相同工具,您可以確保在IDE中具有一致的開發體驗。
Java基礎(五):JDK1.8新特性
JDK1.8新特性 lambda表示式 Lambda lambda作用:lambda是一個語法糖,簡化匿名內部類的使用。 lambda使用條件 引數或者變數必須是介面 介面中只包含一個抽象方法 lambda格式 (引數型別 引數名稱 …)-> { 程
一個簡單的BitTorrent客戶端實現(五):tracker manager和tracker實現
關於tracker和tracker manager tracker在整個bt協議中起著很重要的作用,從tracker那裡我們可以獲取當前正在下載的peer列表,從而與它們互動,進行檔案的上傳和下載。TrackerManager顧名思義就是管理tracker的。
spring boot:repository thymeleaf postgresql java程(五):存在問題分析
1. 使用thymeleaf前臺框架時,由於框架對於未使用到的資料進行了過濾,因此傳到前臺的資料中,有些沒使用到的類的欄位會被賦值為null,前臺將該類put回伺服器時, 這些欄位的值就丟失了。如果用repository直接save那麼將導致資料丟失。處理這種問題的辦法
java核心(五):堆內存、棧內存、直接內存
fifo 創建 inpu 申請 棧內存 先進先出 span size -a 一、什麽是堆內存、棧內存? Java把內存劃分成兩種:一種是堆內存,一種是棧內存。 堆:主要用於存儲實例化的對象,數組。由JVM動態分配內存空間。一個JVM只有一個堆內存,線程是可以共享數據
Java基礎(五):陣列
> 陣列,一種應用非常廣泛的資料結構,簡單地來說就是一組**型別相同**且**無序**的元素的儲存在**固定長度**且**有序**的記憶體空間。 ### 建立一個數組 在Java中,我們可以通過`[]`去宣告一個指定型別的陣列 ```java int[] a; // 寫法一 int a[]; //
Java年度總復習基礎部分(一)
存在 最大 數據 cas 初始化 tomcat java 開關 規範 JAVA總復習綱要 Java傻瓜化的運行環境 1. eclipse 和jre 2. 使用jdk,jdk需要配置環境變量 如何配置環境變量 自己編寫第一個java的運行程序 1 packa
JAVA高併發(二)------區分執行緒和程序
執行緒與程序 程序是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎,在早期面向程序設計的計算機結構中,程序是程式的基本執行實體,在當代面向執行緒的計算機結構中,程序是執行緒的容器,程式是指令資料及其組織形式的描述,程序
JAVA學習(七):方法重載與方法重寫、thiskeyword和superkeyword
格式 hello new 初始 per 而且 方法重寫 學習 方式 方法重載與方法重寫、thiskeyword和superkeyword 1、方法重載 重載可以使具有同樣名稱但不同數目和類型參數的類傳遞給方法。 註: 一是重載方法的參數列表必須與被重載的方法不同
算法復習——2—sat(bzoj2199)
例如 ssi 這就是 mat else 原則 cti 題目 amp 題目: Description 由於對Farmer John的領導感到極其不悅,奶牛們退出了農場,組建了奶牛議會。議會以“每頭牛 都可以獲得自己想要的”為原則,建立了下面的投票系統: M只到場的奶牛 (1