Java粗淺認識-併發程式設計(四)-執行緒間通訊
執行緒間通訊
執行緒間通訊,就是對同進程類共享資源的安全訪問,Java中通過AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer)同步器來實現資源安全訪問,常見基礎工具型別,java.util.concurrent.CountDownLatch(java1.5)、java.util.concurrent.Semaphore(java1.5)、java.util.concurrent.CyclicBarrier(java1.5)、java.util.concurrent.Phaser(java7),關於AQS的講解,後續在原始碼解讀的篇章中會講到。
java.util.concurrent.CountDownLatch
需要注意的是,如果執行次數小於了count,會一直阻塞
/** * countDownLatch控制 * @throws InterruptedException */ public static void countDownLatch() throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); int size = 10; CountDownLatch countDownLatch = new CountDownLatch(size); for(int i = 0;i<size;i++){ executorService.execute(new CountDownLatchTask(countDownLatch)); } countDownLatch.await(); System.out.println("執行完畢"); executorService.shutdown(); } private static class CountDownLatchTask implements Runnable { private CountDownLatch latch; public CountDownLatchTask(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "執行任務。"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); } }
java.util.concurrent.Semaphore
同時permits個執行緒執行,實力模擬同時兩個任務執行。
/** * 訊號量限流 */ private static void semaphore() { ExecutorService executorService = Executors.newFixedThreadPool(4); int permits = 2; Semaphore semaphore = new Semaphore(permits); for (int i = 0; i < 10; i++) { executorService.execute(new SemaphoreTask(semaphore)); } executorService.shutdown(); } private static class SemaphoreTask implements Runnable { Semaphore semaphore; public SemaphoreTask(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { //獲取一個訊號 try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "執行任務."); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //執行完後釋放 semaphore.release(); } }
java.util.concurrent.CyclicBarrier
可迴圈使用的屏障,可以理解為可以迴圈使用的計數器,達到parties後繼續往下執行。
/**
* 需要注意,線上程池中每個執行緒執行完自己的任務後就await()
* 在一種情況下會出現阻塞,當parties 大於執行緒池最大執行緒數時,所有的執行緒都被阻塞
* 比如這裡的parties = 5,FixedThreadPool.nThreads=5,如果parties>FixedThreadPool.nThreads會一直阻塞
* 其實還是容易想到,可執行執行緒都在等待剩餘任務。
*/
private static void cyclicBarrier() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
System.out.println("任務執行完畢");
});
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
executorService.execute(new CyclicBarrierTask(cyclicBarrier));
}
executorService.shutdown();
}
private static class CyclicBarrierTask implements Runnable{
private CyclicBarrier barrier;
CyclicBarrierTask(CyclicBarrier barrier){
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"執行任務。");
TimeUnit.SECONDS.sleep(3);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
java.util.concurrent.Phaser(java7)
A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.
可重用的同步屏障,和CyclicBarrier and CountDownLatch功能相似,但支援更多的操作。
模擬,所有人玩一個遊戲,所有人都要完成n階段的任務,每個階段完成後,都有獎勵。
/**
* 玩一個遊戲,所有人都要完成n階段的任務,每個階段完成後,都有獎勵
*/
private static void phaser(final int n) {
Phaser phaser = new Phaser(){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"===============完成"+phase+"階段的事務,派發禮物。");
return phase >= n || registeredParties == 0;
}
};
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 4; i++) {
executorService.execute(new PhaserTask(phaser));
}
executorService.shutdown();
}
private static class PhaserTask implements Runnable{
Phaser phaser;
public PhaserTask(Phaser phaser){
this.phaser = phaser;
}
@Override
public void run() {
phaser.register();
Random random = new Random();
do {
System.out.println(Thread.currentThread().getName() + "進入"+phaser.getPhase()+"階段。");
try {
int elapsed = random.nextInt(10);
TimeUnit.SECONDS.sleep(elapsed);
System.out.println(Thread.currentThread().getName() + "耗時"+elapsed+"s.");
} catch (InterruptedException e) {
e.printStackTrace();
}
//進入下一個階段
phaser.arriveAndAwaitAdvance();
}while(!phaser.isTerminated());
//執行完畢
phaser.arriveAndDeregister();
}
}
總結
這章節講了4中同步器,java.util.concurrent.CountDownLatch、java.util.concurrent.Semaphore、java.util.concurrent.CyclicBarrier、java.util.concurrent.Phaser,需要自己寫例項思考才能靈活運用,下一章節,執行緒池使用。