1. 程式人生 > >Jdk8--新特性--串並行流與ForkJoin框架

Jdk8--新特性--串並行流與ForkJoin框架

並行流就是把一個內容分成多個數據塊,並用不同的執行緒分別處理每個資料塊的流。穿行流則相反,並行流的底層其實就是ForkJoin框架的一個實現。

那麼先了解一下ForkJoin框架吧。

Fork/Join框架:在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這裡就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join彙總。

Fork/Join與傳統執行緒池的區別!

Fork/Join採用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到執行緒佇列中,然後再從一個隨即執行緒中偷一個並把它加入自己的佇列中。

就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的佇列中,對於傳統的執行緒,ForkJoin更有效的利用的CPU資源!

我們來看一下ForkJoin的實現:實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,相反Action則沒有

測試

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

import org.junit.Test;

public class TestForkJoin {
	
	@Test
	public void test1(){
		long start = System.currentTimeMillis();
		
		ForkJoinPool pool = new ForkJoinPool();
		ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L);
		
		long sum = pool.invoke(task);
		System.out.println(sum);
		
		long end = System.currentTimeMillis();
		
		System.out.println("耗費的時間為: " + (end - start)); //112-1953-1988-2654-2647-20663-113808
	}
	
	@Test
	public void test2(){
		long start = System.currentTimeMillis();
		
		long sum = 0L;
		
		for (long i = 0L; i <= 10000000000L; i++) {
			sum += i;
		}
		
		System.out.println(sum);
		
		long end = System.currentTimeMillis();
		
		System.out.println("耗費的時間為: " + (end - start)); //34-3174-3132-4227-4223-31583
	}
	
	@Test
	public void test3(){
		long start = System.currentTimeMillis();
		
		Long sum = LongStream.rangeClosed(0L, 10000000000L)
							 .parallel()
							 .sum();
		
		System.out.println(sum);
		
		long end = System.currentTimeMillis();
		
		System.out.println("耗費的時間為: " + (end - start)); //2061-2053-2086-18926
	}

}
import java.util.concurrent.RecursiveTask;

public class ForkJoinCalculate extends RecursiveTask<Long>{

	/**
	 * 
	 */
	private static final long serialVersionUID = 13475679780L;
	
	private long start;
	private long end;
	
	private static final long THRESHOLD = 10000L; //臨界值
	
	public ForkJoinCalculate(long start, long end) {
		this.start = start;
		this.end = end;
	}
	
	@Override
	protected Long compute() {
		long length = end - start;
		
		if(length <= THRESHOLD){
			long sum = 0;
			
			for (long i = start; i <= end; i++) {
				sum += i;
			}
			
			return sum;
		}else{
			long middle = (start + end) / 2;
			
			ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
			left.fork(); //拆分,並將該子任務壓入執行緒佇列
			
			ForkJoinCalculate right = new ForkJoinCalculate(middle+1, end);
			right.fork();
			
			return left.join() + right.join();
		}
		
	}

}

我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當資料很大的時候ForkJoin比普通執行緒實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快

Java 8 中將並行流進行了優化,我們可以很容易的對資料進行並行流的操作,Stream API可以宣告性的通過parallel()與sequential()在並行流與穿行流中隨意切換!