1. 程式人生 > >淺談Fork/Join框架

淺談Fork/Join框架

前言:本文基於jdk1.7,jdk1.8與jdk1.7還是有些差別。

一、什麼是Fork/Join框架

Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架。簡單來說,Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+…+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果。

這裡要介紹一下工作竊取演算法:指某個執行緒從其他佇列裡竊取任務來執行。簡單說就是,將一個大任務分割為若干個互不依賴的子任務,這些子任務放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應。當某個執行緒完成了自己佇列的任務之後,從其他未完成任務的執行緒的佇列中竊取任務來執行。但是,這樣子的話這些執行緒之間會存在競爭關係,所以一般會採取雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。

二、Fork/Join框架的工作原理

從上面Fork/Join框架的介紹中瞭解到,Fork/Join框架主要分為幾個步驟實現。

1、分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停地分割,直到分割出的子任務足夠小。

2、執行任務併合並結果。分割的子任務分別放在雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執行完的結果都統一放在一個佇列裡,啟動一個執行緒從佇列裡拿資料,然後合併這些資料。

Fork/Join框架通過ForkJoinTask和ForkJoinPool來實現分割任務和執行任務併合並結果。

①ForkJoinTask:我們要使用ForkJoin框架,必須首先建立一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類

,Fork/Join框架提供了以下兩個子類,根據我們的需要(是否需要返回結果)來實現不同的類:
·RecursiveAction:用於沒有返回結果的任務,即執行任務後不會返回結果;
·RecursiveTask:用於有返回結果的任務,即執行任務後會返回結果。
②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。任務分割出的子任務會新增到當前工作執行緒所維護的雙端佇列中,進入佇列的頭部。當一個工作執行緒的佇列裡暫時沒有任務時,它會隨機從其他工作執行緒的佇列的尾部獲取一個任務。
接下來我們通過一個例子來理解Fork/Join框架是如果工作的:計算1+2+3+4的結果

首先,我們需要分割任務,假如我們希望每個子任務最多執行兩個數的相加,那麼我們設定分割的閾值是2,由於是4個數字相加,所以Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然後再join兩個子任務
的結果。因為是有結果的任務,所以必須繼承RecursiveTask,實現程式碼如下:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
    // 閾值,表示拆分為幾個子任務
	private static final int THRESHOLD = 2;
	private int start;
	private int end;
	public CountTask(int start, int end) {
		this.start = start;
		this.end = end;
	}
	@Override
	protected Integer compute() {
		int sum = 0;
		// 如果任務小於等於閾值則計算任務
		Boolean canCompute = (end - start) <= THRESHOLD;
		if (canCompute) {
			for (int i = start; i <= end; i++) {
				sum += i;
			}
		} else {
			// 如果任務大於閾值,就分裂成兩個子任務計算
			int middle = (start + end) / 2;
			CountTask leftTask = new CountTask(start, middle);
			CountTask rightTask = new CountTask(middle + 1, end);
			// 執行子任務
			leftTask.fork();
			rightTask.fork();
			// 等待子任務執行完,並得到其結果
			int leftResult=leftTask.join();
			int rightResult=rightTask.join();
			// 合併子任務
			sum = leftResult + rightResult;
		}
		r
		eturn sum;
	}
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		// 生成一個計算任務,負責計算1+2+3+4
		CountTask task = new CountTask(1, 4);
		// 執行一個任務
		Future<Integer> result = forkJoinPool.submit(task);
		try {
			System.out.println(result.get());
		}
		catch (InterruptedException e) {
		}
		catch (ExecutionException e) {
		}
	}
}

從上面的程式碼可以看到, ForkJoinTask的子類需要實現compute方法,在這個方法裡,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在呼叫fork方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。

此外,ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。

if(task.isCompletedAbnormally())
{
	System.out.println(task.getException());
}

getException方法返回Throwable物件,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有丟擲異常則返回null。

三、Fork/Join框架的實現原理

ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,ForkJoinTask陣列負責將存放程式提交給ForkJoinPool的任務,而ForkJoinWorkerThread陣列負責執行這些任務。
(1)ForkJoinTask的fork方法實現原理
當我們呼叫ForkJoinTask的fork方法時,程式會呼叫ForkJoinWorkerThread的pushTask方法非同步地執行這個任務,然後立即返回結果。

public final ForkJoinTask<V> fork() {
	((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);
	return this;
}

pushTask方法把當前任務存放在ForkJoinTask陣列佇列裡。然後再呼叫ForkJoinPool的signalWork()方法喚醒或建立一個工作執行緒來執行任務。

final void pushTask(ForkJoinTask<> t) {
	ForkJoinTask<>[] q;
	int s, m;
	if ((q = queue) != null) {
		// ignore if queue removed
		long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
		UNSAFE.putOrderedObject(q, u, t);
		queueTop = s + 1;
		// or use putOrderedInt
		if ((s -= queueBase) <= 2)
		pool.signalWork(); else if (s == m)
		growQueue();
	}
}

(2)ForkJoinTask的join方法實現原理
Join方法的主要作用是阻塞當前執行緒並等待獲取結果。

public final V join() {
	if (doJoin() != NORMAL)
	return reportResult();
    else
	return getRawResult();
}
private V reportResult() {
	int s;
	Throwable ex;
	if ((s = status) == CANCELLED)
	    throw new CancellationException();
	if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
	    UNSAFE.throwException(ex);
	return getRawResult();
}

首先,它呼叫了doJoin()方法,通過doJoin()方法得到當前任務的狀態來判斷返回什麼結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、訊號(SIGNAL)和出現異常(EXCEPTIONAL)。
·如果任務狀態是已完成,則直接返回任務結果。
·如果任務狀態是被取消,則直接丟擲CancellationException。
·如果任務狀態是丟擲異常,則直接丟擲對應的異常。
讓我們再來分析一下doJoin()方法的實現程式碼

private int doJoin() {
	Thread t;
	ForkJoinWorkerThread w;
	int s;
	Boolean completed;
	if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
		if ((s = status) < 0)
				return s;
		if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
			try {
				completed = exec();
			}
			catch (Throwable rex) {
				return setExceptionalCompletion(rex);
			}
			if (completed)
						return setCompletion(NORMAL);
		}
		return w.joinTask(this);
	} else
		return externalAwaitDone();
}

在doJoin()方法裡,首先通過檢視任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;如果沒有執行完,則從任務數組裡取出任務並執行。如果任務順利執行完成,則設定任務狀態為NORMAL,如果出現異常,則記錄異常,並將任務狀態設定為EXCEPTIONAL。
 

《Java併發程式設計的藝術》