1. 程式人生 > >Java 7之多執行緒

Java 7之多執行緒

Java沒有一種安全的搶佔式方法來停止執行緒,只有一些協作式機制。其中一種協作機制能設定某個“已請求取消”標誌,而任務將定期檢視該標誌。如果設定了這個標誌,那麼任務將提前結束。舉例如下:

public class PrimeGenerator implements Runnable {
	private static ExecutorService exec = Executors.newCachedThreadPool();

	private final List<BigInteger> primes = new ArrayList<BigInteger>();
	private volatile boolean cancelled; // 為了保證可靠,需要volatile型別

	public void run() {
		BigInteger p = BigInteger.ONE;// 建立一個大整數型別,初始值為1
		while (!cancelled) {
			p = p.nextProbablePrime();
			synchronized (this) { // 在新增時要確保同步
				primes.add(p);
			}
		}
	}
    // 設定取消任務的標識cancelled,以防止搜尋素數的執行緒永遠執行下去
	public void cancel() {
		cancelled = true;
	}
    // 獲取已經計算出來的素數
	public synchronized List<BigInteger> get() { // 對ArrayList進行復制,保證正確的遍歷
		return new ArrayList<BigInteger>(primes);
	}

	static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
		PrimeGenerator generator = new PrimeGenerator();
		exec.execute(generator);// 執行這個任務
		try {
			SECONDS.sleep(1);
		} finally {
			generator.cancel(); // 確保在呼叫sleep時被中斷也能取消素數生成器的任務
		}
		return generator.get();
	}
}
PrimeGenerator使用了一種簡單的取消策略:客戶程式碼通過呼叫cancel來請求取消,PrimeGenerator在每次搜尋素數前首先檢查是否存在取消請求,如果存在則退出。

如果使用這種策略來請求取消,那麼當任務呼叫了一個阻塞方法的時候,可能任何永遠不會檢查取消標誌,因此永遠不會結束。如下舉例:

class BrokenPrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue; // 阻塞佇列
    private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!cancelled)
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
        }
    }

    public void cancel() {
        cancelled = true;
    }
}
當生產者將佇列添滿時,消費者希望取消這個任務。但是由於生產者此時處於阻塞狀態,那麼cancelled標誌將得不到檢查,生產者不能從阻塞的方法中恢復過來。

執行緒中斷是一種協作機制,每個執行緒都有一個boolean型別的中斷狀態。在Thread類中提供了3箇中斷方法,如下:

 public void interrupt();            // 中斷目標執行緒
 public boolean isInterrupted();     // 返回目標執行緒的中斷狀態
 public static boolean interrupted();// 清除當前執行緒的中斷狀態,並返回它之前的值。也是清除中斷狀態的唯一方法

(1)interrupt()方法

對於阻塞庫方法,如wait、join、sleep方法,都會檢查執行緒何時中斷,並且在發現中斷時提前返回。呼叫這個方法會引起這個執行緒的interrupt狀態被清空(設為false),並且會丟擲InterruptedException,表示阻塞操作由於中斷而提前結束。

當執行緒在非阻塞狀態下中斷時,中斷狀態將被設定。

(2)isInterrupted()方法

這個方法的原始碼如下:

 public boolean isInterrupted() {
  return isInterrupted(false);
 }
 private native boolean isInterrupted(boolean ClearInterrupted);
返回這個執行緒是否被interrupt了,呼叫這個方法不會影響這個執行緒的interrupt狀態

(3)interrupted()方法

來看這個方法的原始碼,如下:

public static boolean interrupted() {
  return currentThread().isInterrupted(true);
}
private native boolean isInterrupted(boolean ClearInterrupted);
呼叫這個方法會返回當前執行緒的interrupt狀態(true或false),並把當前執行緒的interrupt狀態清空(設為false)。如果在呼叫的時候返回true,那麼除非你想遮蔽這個中斷,否則必須對它進行處理 - 可以丟擲InterruptedException,或者通過再次呼叫interrupt()來恢復中斷狀態。
 注意:這個是個靜態方法,並且返回的是當前執行緒狀態,並不一定是呼叫者的執行緒狀態。

使用這幾個方法可以解決如上的自定義取消機制與可阻塞的庫函式之間互動的問題,如下:

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())// 在在啟動尋找素數前進行檢測
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }

    public void cancel() {
        interrupt();
    }
}