害,又是一個炒冷飯的時間。fork/join是在jdk1.7中出現的一個併發工作包,其特點是可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合併成最後的計算結果,並進行輸出。從而達到多執行緒分發任務,達到高效處理的目的。
1. 關於fork/join的一點想法
以上說法,也許大家沒什麼感覺。但換個說法可能會更讓人體會深切。總體上,相當於一個map階段資料拆分,一個reduce階段資料收集。即一個mapreduce過程,是不是有大資料的思想在了。只不過這fork/join的拆分難度可見性更大(自己手動拆,mapreduce由shuffle元件自動拆),另外fork/join是在一個機器上執行,而大資料的框架,則是在分散式系統中執行的。
從這個點說來,好像研究fork/join就顯得有些意義了。
只是,按照fork/join的語義解釋,是將任務拆分,然後處理,然後再合併結果。如果沒有了合併結果這一步,那麼,它就等同於執行緒池了,這也就是有人說它與執行緒池有啥差別的疑惑所在了。再說有需要收集結果的這一語義,其實我們也是可以通過執行緒池去執行任務,然後再用get()得到結果,然後在外部做合併,也是一樣咯。
2. fork/join的幾個核心類
fork/join被稱作執行框架,自然不會是一個單一元件問題了。
首先,它會有一個 ForkJoinPool, 相當於執行緒池, 所有的任務都要通過它來進行提交,然後由其進行統一排程。
然後,每個任務都會有許多相同的程式碼,只有業務實現是不一樣的,所以它會有一個基類: RecursiveTask . 實現上還有一個無返回結果的類:RecursiveAction, 只是沒有返回結果時,往往又可能可以使用普通執行緒池執行替代了。(沒有絕對)
ForkJoinWorkerThreadFactory, 是fork/join框架的執行緒工廠類,原本含義與普通的執行緒工廠類一致,只是它的入參不再是一個個 Runnable 任務,而是 ForkJoinPool, 因為它們所處的上下文是不一樣的。
ForkJoinWorkerThread, 執行fork/join的具體執行緒,它可能在執行過程中,再去主動新增task。而它自身擁有一個佇列,它的主要任務就是獲取佇列任務,然後執行。但當其自身的佇列完成時,它可以通過work-steal演算法竊取其他執行緒的佇列任務。這也是fork/join的核心所在。
sun.misc.Unsafe, 之所以要提到這個jdk類,是因為在fork/join框架中,對於佇列的管理,不是通過普通的list或陣列來實現,而是通過 U.putOrderedObject(a, j, task); 來存放,雖然效果與陣列是一樣的,但它會更簡單地實現執行緒安全的操作。只是,其中有許多的位操作,值得學習的同時,也顯得有些麻煩了。
3. fork/join使用樣例
我們通過對一個數組的排序過程,使用fork/join來實現看看如何使用這框架。尤其對於大陣列的排序,顯得還是有用的。這種大陣列的排序,一般都會使用快速排序或者歸併排序來處理。此處使用fork/join框架來處理,也是暗合了歸併排序的道理了。
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask; /**
* Fork/join框架測試
*/
public class TestForkJoinFramework { public static void main(String[] args) {
long beginTime = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
int mockArrLen = 1000_0000;
int[] arr = new int[mockArrLen];
Random r = new Random();
for (int index = 1; index <= mockArrLen; index++) {
arr[index - 1] = r.nextInt(1000_0000);
}
FJOrderTask task = new FJOrderTask(arr);
ForkJoinTask<int[]> taskResult = pool.submit(task);
try {
// 等待結果完成
taskResult.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("耗時=" + (endTime - beginTime));
} /**
* 單個排序的子任務
*/
private static class FJOrderTask extends RecursiveTask<int[]> { /**
* 當前排序的陣列值
*/
private final int[] source; public FJOrderTask(int[] source) {
this.source = source;
} /**
* 真正的業務計算邏輯
*
* @see java.util.concurrent.RecursiveTask#compute()
*/
@Override
protected int[] compute() {
int sourceLen = source.length;
// 如果條件成立,說明任務中要進行排序的集合還不夠小
System.out.println(Thread.currentThread());
if (sourceLen > 2) {
int midIndex = sourceLen / 2;
// 拆分成兩個子任務, 0 -> mid - 1, mid -> len
FJOrderTask task1 = new FJOrderTask(
Arrays.copyOf(source, midIndex));
task1.fork();
FJOrderTask task2 = new FJOrderTask(
Arrays.copyOfRange(source, midIndex, sourceLen));
task2.fork();
// 將兩個有序的陣列,合併成一個有序的陣列
int[] result1 = task1.join();
int[] result2 = task2.join();
return insertMerge(result1, result2);
}
// 否則說明集合中只有一個或者兩個元素,可以進行這兩個元素的比較排序了
else {
// 如果條件成立,說明陣列中只有一個元素,或者是陣列中的元素都已經排列好位置了
if (sourceLen == 1
|| source[0] <= source[1]) {
return source;
} else {
int[] orderedArr = new int[sourceLen];
orderedArr[0] = source[1];
orderedArr[1] = source[0];
return orderedArr;
}
}
} /**
* 使用插入排序,將兩個有序數組合並起來
*
* @param arr1 有序陣列1
* @param arr2 有序陣列2
* @return 合併後的有序陣列
*/
private int[] insertMerge(int[] arr1, int[] arr2) {
int[] result = new int[arr1.length + arr2.length];
int arr1Len = arr1.length;
int arr2Len = arr2.length;
int destLen = result.length;
// 簡單插入排序
for (int i = 0, array1Index = 0, array2Index = 0; i < destLen; i++) {
int value1 = array1Index >= arr1Len
? Integer.MAX_VALUE : arr1[array1Index];
int value2 = array2Index >= arr2Len
? Integer.MAX_VALUE : arr2[array2Index];
if (value1 < value2) {
array1Index++;
result[i] = value1;
}
else {
array2Index++;
result[i] = value2;
}
}
return result;
} }
}
思路很簡單,就是將陣列一直拆分,直到最後一個或者兩個時,從最下面來開始排序,然後依次往上回溯,使用插入排序歸併結果集,最終返回排好序的值。如果除去任務拆分的過程,則時間複雜度還是非常好的 O(nlog(n)), 只是這任務拆分的過程,需要大量的空間複雜度,也不見得是什麼好事。且不管它。
4. fork/join框架的實現原理
我們以上面的demo為出發點,觀察fork/join的工作過程,不知道100%,也八九不離十了。上面主要有幾個動作,一ForkJoinPool例項化,submit一個Task, get()等待最終結果完成。這三個看得見的動作好辦,只是其核心也許還在背後。
4.1. ForkJoinPool構造器
每個要呼叫框架的應用,必先初始化一個pool例項,這是自然。如上使用無參構造器,實際上是使用了框架的各種預設值而已, 這種預設值往往是能夠滿足大部分的場景的,從而體現其易用性。
// java.util.concurrent.ForkJoinPool#ForkJoinPool()
/**
* Creates a {@code ForkJoinPool} with parallelism equal to {@link
* java.lang.Runtime#availableProcessors}, using the {@linkplain
* #defaultForkJoinWorkerThreadFactory default thread factory},
* no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool() {
// 並行度預設是cpu的核數
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the given parameters.
*
* @param parallelism the parallelism level. For default value,
* use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value,
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
* tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
* For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
// FIFO_QUEUE = 1 << 16, LIFO_QUEUE = 0
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
/**
* Creates a {@code ForkJoinPool} with the given parameters, without
* any security checks or parameter validation. Invoked directly by
* makeCommonPool.
*/
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
構造器自然沒啥好說的,就是設定幾個並行度,初始化執行緒工廠,標識等等。為下文做準備。
4.2. 任務submit過程
上面的例子中,submit只有一次呼叫,而實際應用中則不一定。但即使如此,一次submit, 其實背後也是有許多的動作的。因為這一個task裡,又會生出許多task來。
// java.util.concurrent.ForkJoinPool#submit
/**
* Submits a ForkJoinTask for execution.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
// submit主要是向pool中加入任務佇列
externalPush(task);
return task;
}
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
// 如果執行緒不是第一次進入,且獲得鎖,則直接放佇列即可
// 否則走普通加入佇列邏輯
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
// 通過 putOrderedObject 新增任務到佇列中
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 初始化時的submit或者通用 submit
externalSubmit(task);
} /**
* Full version of externalPush, handling uncommon cases, as well
* as performing secondary initialization upon the first
* submission of the first task to the pool. It also detects
* first submission by an external thread and creates a new shared
* queue if the one at index if empty or contended.
*
* @param task the task. Caller must ensure non-null.
*/
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
// 停止執行
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
// 未被初始化,先執行初始化
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
// 上鎖初始化
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
// 佇列數量初始化
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
// 當前執行緒已新增過佇列
else if ((q = ws[k = r & m & SQMASK]) != null) {
// 上鎖新增到佇列中
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
// 取出棧頂指標,向其中放入任務
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 如果佇列新增成功,則喚醒一個 worker, 返回
// 否則進入下一次嘗試新增過程
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
// 如有必要,為當前執行緒生成新的標識
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
由上可知,submit主要初始化佇列以及向佇列中新增任務,並在喚醒worker處理任務。但實際上,worker Thread 我們還沒有看到被啟用,只是看到有隊workQueue的初始化。那麼,worker又是在哪進行初始化的呢?只可能是在 signal 的時候了。
4.3. worker的初始化
worker是真正執行任務的執行緒,前面光看到新增佇列,以及喚醒worker了。只是這時還未見worker被初始化,實際上它是在被喚醒的邏輯中進行初始化的。
// java.util.concurrent.ForkJoinPool#signalWork
/**
* Tries to create or activate a worker if too few are active.
*
* @param ws the worker array to use to find signallees
* @param q a WorkQueue --if non-null, don't retry if now empty
*/
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active,一個標識,分兩段使用,低位為0代表worker還可以新增
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
} /**
* Tries to add one worker, incrementing ctl counts before doing
* so, relying on createWorker to back out on failure.
*
* @param c incoming ctl value, with total count negative and no
* idle workers. On CAS failure, c is refreshed and retried if
* this holds (otherwise, a new worker is not needed).
*/
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
// 新增標識成功,再建立worker
if (add) {
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
} /**
* Tries to construct and start one worker. Assumes that total
* count has already been incremented as a reservation. Invokes
* deregisterWorker on any failure.
*
* @return true if successful
*/
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 呼叫執行緒工廠建立新的worker, 並立即啟動worker
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
// 建立失敗,處理異常
deregisterWorker(wt, ex);
return false;
}
/**
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
果然在signal時,建立worker。值得一提的,為了實現安全地新增worker,它會先更新成功ctl,然後再執行真正的create操作。避免多創建出worker來。
4.4. worker的工作原理
前面看到worker建立過程,傳入了pool的例項,即當前上下文都是被worker可見的。所以,它能很好地複用當前的配置資訊,而它自身是一個非同步執行緒,在建立之後,立即被啟動起來了。那它後續則必然嘗試從佇列獲取任務,進行執行了。具體如何?
1. WorkerThread 構造方法
// java.util.concurrent.ForkJoinWorkerThread#ForkJoinWorkerThread
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
// workQueue 臨時向 pool 中進行註冊所得
this.workQueue = pool.registerWorker(this);
} /**
* Callback from ForkJoinWorkerThread constructor to establish and
* record its WorkQueue.
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
重點則是在 pool 中註冊自身,得到一個 workQueue. 而其具體業務,則是在run方法中實現。
// java.util.concurrent.ForkJoinWorkerThread#run
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
// java.util.concurrent.ForkJoinPool#runWorker
/**
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
*/
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
// 取任務,執行
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
} /**
* Executes the given task and any remaining local tasks.
*/
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
// java.util.concurrent.ForkJoinTask#doExec
/**
* Primary execution method for stolen tasks. Unless done, calls
* exec and records status if completed, but doesn't wait for
* completion otherwise.
*
* @return status on exit from this method
*/
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
// java.util.concurrent.RecursiveTask#exec
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
// 即呼叫具體業務類的 compute 方法
result = compute();
return true;
}
咱們草草看了 worker 如何執行任務。這和執行緒池沒多少差別,大致仍是從佇列獲取任務,然後執行業務方法compute . 我們暫時略去了如何獲取任務,以及如何執行work-steal了。且看下節。
4.5. 任務獲取實現
主要是通過scan處理。
// java.util.concurrent.ForkJoinPool#scan
/**
* Scans for and tries to steal a top-level task. Scans start at a
* random location, randomly moving on apparent contention,
* otherwise continuing linearly until reaching two consecutive
* empty passes over all queues with the same checksum (summing
* each base index of each queue, that moves on each steal), at
* which point the worker tries to inactivate and then re-scans,
* attempting to re-activate (itself or some other worker) if
* finding a task; otherwise returning null to await work. Scans
* otherwise touch as little memory as possible, to reduce
* disruption on other scanning threads.
*
* @param w the worker (via its WorkQueue)
* @param r a random seed
* @return a task, or null if none found
*/
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
// 首次獲取時,是從自身佇列中獲取
if ((q = ws[k]) != null) {
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
要安全高效地實現一個獲取佇列還是不易啊。
4.6. task.fork 實現
一般地,能用上fork一詞的場景,一般是對於當前環境的一個copy. 難道這裡的fork也是這樣嗎?新開一個執行緒?不然又是如何找到需要處理的佇列的呢?
// java.util.concurrent.ForkJoinTask#fork
/**
* Arranges to asynchronously execute this task in the pool the
* current task is running in, if applicable, or using the {@link
* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
* it is not necessarily enforced, it is a usage error to fork a
* task more than once unless it has completed and been
* reinitialized. Subsequent modifications to the state of this
* task or any data it operates on are not necessarily
* consistently observable by any thread other than the one
* executing it unless preceded by a call to {@link #join} or
* related methods, or a call to {@link #isDone} returning {@code
* true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
Thread t;
// ForkJoinWorkerThread 中持有workQueue例項,可直接向其新增任務
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果是外部執行緒,則新增到一共享pool中即可,後續將其各空閒執行緒處理
ForkJoinPool.common.externalPush(this);
return this;
}
// java.util.concurrent.ForkJoinPool.WorkQueue#push
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
} /**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
* {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*
* This class just maintains links to its pool and WorkQueue. The
* pool field is set immediately upon construction, but the
* workQueue field is not set until a call to registerWorker
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/ final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
...
}
可見,fork的過程,即是向當前執行緒中添加當前任務而已,並沒有所謂的上下文copy過程。
4.7. task.join 實現
join的語義是,等待任務完成後返回。與 Thread.join()一致。只是有一個問題,即如果某個執行緒阻塞等待結果去了,那當前執行緒自然就相當於無法再被利用了。那後續的任務又何從談起呢?想來只有遞迴能夠解決這個問題了。但是遞迴往往又是在單執行緒中完成的,這豈不無法利用併發特性了?
實際上,之所以被分作fork/join兩個步驟,意義就是在這。上一節我們看到,fork的過程是向佇列中添加了任務,隨後就返回了。這時,如果當前worker比較繁忙(在做任務拆分),則這些任務就會被其他worker竊取過去處理了。而其他任務在處理時,又會遇到自己的遞迴,從而將一個單執行緒的遞迴變為多執行緒的遞迴了。
下面我們主要看一個執行緒的遞迴過程。join的本義只是等待當前任務完成,但是當前任務完成又要依賴於其子任務完成join, 子任務又要等待其子任務join, 因此形成遞迴。而join()返回的表象是compute()完成,所以這過程其實是伴隨著compute的運算的。
// java.util.concurrent.ForkJoinTask#join
/**
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
*
* @return the computed result
*/
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
// 任務完成後,主動獲取結果
return getRawResult();
}
/**
* Throws exception, if any, associated with the given status.
*/
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
// java.util.concurrent.RecursiveTask#getRawResult
public final V getRawResult() {
return result;
} /**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 取當前任務執行, doExec 執行任務,awaitJoin 等待執行完成
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
} // java.util.concurrent.ForkJoinPool#awaitJoin
/**
* Helps and/or blocks until the given task is done or timeout.
*
* @param w caller
* @param task the task
* @param deadline for timed waits, if nonzero
* @return task status on exit
*/
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
if ((s = task.status) < 0)
break;
if (cc != null)
helpComplete(w, cc, 0);
// 遞迴新增任務等待完成
else if (w.base == w.top || w.tryRemoveAndExec(task))
helpStealer(w, task);
if ((s = task.status) < 0)
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
// java.util.concurrent.ForkJoinPool.WorkQueue#tryRemoveAndExec
/**
* If present, removes from queue and executes the given task,
* or any other cancelled task. Used only by awaitJoin.
*
* @return true if queue empty and task not known to be done
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
while ((n = (s = top) - (b = base)) > 0) {
for (ForkJoinTask<?> t;;) { // traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
return s + 1 == top; // shorter than expected
else if (t == task) {
boolean removed = false;
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
// 執行子任務
if (removed)
task.doExec();
break;
}
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; // was cancelled
}
if (--n == 0)
return false;
}
if (task.status < 0)
return false;
}
}
return true;
}
可見,最終fork/join還是使用遞迴完成join任務等待。差別在於其利用了多執行緒的優勢,同時執行多個任務。這有兩個好處,一是減輕了單執行緒的任務處理壓力,二是讓遞迴的深度也分擔到了多個點上。避免了棧早早溢位的可能。
只是每個執行緒被分配的任務數是多少,join需要等待的結果有多少,就不太好說了。比如最上層的執行緒如果任務被別的執行緒搶走,則它就只需一直在等結果就行了。而最下面的執行緒,則需要承擔最深的遞迴深度,以保證程式的最終出口。其實從這個點,我們自己可以做個猜想,如果沒有做好控制,讓執行緒之間任意執行任務,是否會造成死鎖呢?這恐怕是個問題。