1. 程式人生 > >java執行緒之Latch設計模式見解

java執行緒之Latch設計模式見解

CountDownLatch :(個人理解)使用閥門值,直到閥門值為0之前,一直阻塞執行緒。實則使用物件鎖,不釋放物件鎖,一直佔用資源,(這裡是一個缺點)。閥門值為0時,呼叫釋放物件鎖的方法,釋放資源。應用的場景,我覺得是需要一些執行緒先完成的前提下,再使用其他執行緒。也就是我就是要一些重要的執行緒(也不是特指重要)完成任務之後,接著再執行其他執行緒。

Table of Contents

demo1-重寫CountDownLatch

demo2-子執行緒任務完成之後執行主執行緒

demo3-多個CountDownLatch的應用 


本文引用的demo均為轉載,其他內容是自己原創。感謝這些demo的提供者。為了部落....O(∩_∩)O哈哈~


demo1-重寫CountDownLatch

這個是我在《java高併發詳解》中找到的demo (注:有一些自己的調整): 

(readme: 重寫了CountDownLatch類以及一些方法。建立了一個自定義異常TimeoutException。應用了一些鎖的知識點。(synchronized會一直等待執行緒釋放物件鎖而造成阻塞,wait方法會釋放物件鎖,本程式如果不加該方法的後果就是會一直被阻塞,造成超時。)

四個程式設計師約定在某個時間到某地聚會,每人都會採用交通工具,最後對按時到達的程式設計師,輸出按時到達。 

package com.mzs.entity;

import java.util.concurrent.TimeUnit;

public abstract class Latch {
	
	protected int limit;	// 閥門值
	protected boolean isLate;	// 是否遲到
	
	public Latch(int limit, boolean isLate) {
		this.limit = limit;
		this.isLate = isLate;
	}
	
	/**
	 * 模擬等待
	 * @param unit	時間單位
	 * @param time	預定的到達時間
	 * @throws InterruptedException	被打斷時丟擲該異常
	 * @throws TimeoutException	自定義的時間超時異常,當時間超過預定的到達時間時,丟擲該異常
	 */
	public abstract void await(TimeUnit unit, long time) throws InterruptedException, TimeoutException;
	
	/**
	 * 閥門值減一
	 */
	public abstract void countDown();
	
	/**
	 * 統計未到達的人數
	 * @return 未到達的人數
	 */
	public abstract int getUnarrived();
}

package com.mzs.entity;

import java.util.concurrent.TimeUnit;

public class CountDownLatch extends Latch {

	public CountDownLatch(int limit, boolean isLate) {
		super(limit, isLate);
	}

	@Override
	public void await(TimeUnit unit, long time) throws InterruptedException, TimeoutException {
		if (time < 0)
			throw new IllegalArgumentException("argument is invalid");
		// 表示預定的到達時間
		long remainingTime = unit.toNanos(time);
		long endTime = System.nanoTime() + remainingTime;
		synchronized (this) {
			while (limit > 0) {
				// 剩餘時間小於0,則未按時到達,標記遲到,並丟擲自定義超時異常
				if (TimeUnit.NANOSECONDS.toMillis(remainingTime) < 0) {
					this.isLate = true;
					throw new TimeoutException("time is over");
				}
                // 等待的過程中,被打斷時,時間進行新的處理。
                this.wait(TimeUnit.NANOSECONDS.toMillis(remainingTime));
				// 計算剩餘的時間
				remainingTime = endTime - System.nanoTime();
			}
		}
	}

	@Override
	public void countDown() {
		synchronized (this) {
			// 對閥門值的檢查,如果小於0,丟擲該異常
			if (limit < 0)
				throw new IllegalStateException("the number of limit is illegal");
			// 閥門值自減,表示已到達
			limit--;
			// 通知阻塞執行緒
			this.notifyAll();
		}
	}

	@Override
	public int getUnarrived() {
		return limit;
	}
	

}

package com.mzs.entity;

public class TimeoutException extends Exception {
	
	/**
	 * 
	 */
	private static final long serialVersionUID = -6499958945796073069L;
	
	private String message;
	
	public TimeoutException(String message) {
		super(message);
	}

}

package com.mzs.entity;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class ProgrammerTravel extends Thread {

	private final Latch latch;	// 閥門值
	private final String programmer;	// 程式設計師
	private final String transportation;	// 交通工具

	private Logger logger = Logger.getLogger(getClass().getName());

	public ProgrammerTravel(Latch latch, String programmer, String transportation) {
		this.latch = latch;
		this.programmer = programmer;
		this.transportation = transportation;
	}

	@Override
	public void run() {
		logger.info(programmer + " starts to take the transportation [ " + transportation + " ]");
		try {
			// 模擬程式設計師到達目的地花費時間的隨機性
			TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		latch.countDown();
		if (!latch.isLate)
			logger.info(programmer + " arrived on time!!!");
		if (latch.getUnarrived() == 0)
			logger.info("all the programmers arrived");
	}

	public static void main(String[] args) {
		// 設定閥門值為4,並標記未遲到
		Latch latch = new CountDownLatch(4, false);
		new ProgrammerTravel(latch, "Tom", "bike").start();
		new ProgrammerTravel(latch, "Selina", "bike").start();
		new ProgrammerTravel(latch, "King", "Car").start();
		new ProgrammerTravel(latch, "Khan", "Bus").start();
		try {
			// 設定預定時間為5秒
			latch.await(TimeUnit.SECONDS, 5);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			System.err.println(e);
		}
	}

}

demo2-子執行緒任務完成之後執行主執行緒

重寫runnable run(),使用java.util的CountDownLatch,實現多個子執行緒任務完成之後,執行主執行緒的任務。 

package com.mzs.demo1;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

public class CountDownLatchTest1 {
	
	private static Logger logger = Logger.getLogger("com.mzs.demo1.CountDownLatchTest1");
	
	public static void main(String[] args) {
		ExecutorService service = Executors.newFixedThreadPool(3);
		CountDownLatch latch = new CountDownLatch(3);
		for (int i = 0; i < 3; i++) {
			Runnable runnable = new Runnable() {

				@Override
				public void run() {
					logger.info("child thread [" + Thread.currentThread().getName() + "] starts to execute");
					try {
						Thread.sleep(ThreadLocalRandom.current().nextInt(20));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					logger.info("child thread [" + Thread.currentThread().getName() + "] finished");
					latch.countDown();
				}
			};
			service.execute(runnable);
		}
		logger.info("main thread [" + Thread.currentThread().getName() + "] is waiting");
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.info("main thread [" + Thread.currentThread().getName() + "] starts to execute");
	}

}

demo3-多個CountDownLatch的應用 

package com.mzs.demo1;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;

public class CountDownLatchTest2 {
	
	private static Logger logger = Logger.getLogger("com.mzs.demo1.CountDownLatchTest2");
	
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		CountDownLatch latch = new CountDownLatch(1);
		CountDownLatch latch1 = new CountDownLatch(4);
		for (int i = 0; i < 4; i++) {
			Runnable runnable = new Runnable() {

				@Override
				public void run() {
					logger.info("運動員" + Thread.currentThread().getName() + "等待裁判命令");
					try {
						latch.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					logger.info("運動員" + Thread.currentThread().getName() + "接收裁判命令");
					try {
						Thread.sleep(ThreadLocalRandom.current().nextInt(20));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					logger.info("運動員" + Thread.currentThread().getName() + "到達終點");
					latch1.countDown();
				}
			};
			service.execute(runnable);
		}
		latch.countDown();
		try {
			latch1.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.info("裁判" + Thread.currentThread().getName() + "評判結果");
	}
	
}