1. 程式人生 > >java高併發實戰程式設計(三)

java高併發實戰程式設計(三)

一:Fork/Join 框架:

     分而治之的思想:也就是將一項任務分為多個執行過程進行,最後將結果進行整合。在Linux通過fork()函式建立子程序,使得系統程序可以多一個執行分支(執行緒),要等待這個執行分支執行完畢,才有可能得到最終結果,而join()就表示等待。

    由於執行緒池的優化,提交的任務和執行緒的數量並不是一對一的關係。大多數情況下,一個物理執行緒需要處理多個任務,因此,每個執行緒必然需要擁有一個任務佇列。因此,很有可能執行緒A執行完所有任務後,執行緒B還有一堆任務等待處理,此時,執行緒A就會幫助B,從B的任務佇列中拿一個任務過來處理,儘可能達到平衡。當執行緒幫助別人時,總是從任務佇列的底部開始拿資料,而執行緒試圖執行自己的任務時,則是從頂部開始拿,有利於避免資料競爭。

    如果毫無顧忌的使用fork()開啟執行緒進行處理,可能導致開啟過多執行緒而嚴重影響效能。因此在JDK中,給出了一個ForkJoinPool執行緒池,對於fork()並不急著開啟執行緒,而是提交給ForkJoinPool執行緒池進行處理,以節省系統資源。

1.2 ForkJoinPool的一個重要介面:

   public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

    你可以向ForkJoinPool執行緒池提交一個ForkJoinTask任務。所謂ForkJoinTask任務就是支援fork()分解以及join()等待的任務。ForkJoinTask有兩個重要的子類,RecursiveAction和RecursiveTask.分別表示沒有返回值的任務和可以攜帶返回值的任務

      程式碼:

/**
 * forkJoin框架案例
 */
public class ThreadUtils extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public ThreadUtils(long start,long end){
        this.start = start;
        this.end = end;
    }


    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end -start)<THRESHOLD;
        if (canCompute){
            for (long i=start;i<=end;i++){
                sum+=i;
            }
        }else {
            //分成100個小任務
            long step = (start+end)/1000;
            ArrayList<ThreadUtils> subTasks = new ArrayList<>();
            long pos = start;
            for (int i=0;i<100;i++){
                long lastOne = pos+step;
                if (lastOne>end) lastOne=end;
                ThreadUtils subTask = new ThreadUtils(pos,lastOne);
                pos+=step+1;
                subTasks.add(subTask);
                subTask.fork();
            }
            for (ThreadUtils t : subTasks){
                sum+=t.join();
            }
        }
        return sum;
    }


    public static void main(String[] args){
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ThreadUtils task = new ThreadUtils(0,200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try {
            long res = result.get();
            System.out.println("sum="+res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

  注意:分的任務大小不同,結果好像也不一樣????

二:JDK併發容器:

   構造執行緒安全的Map:使用代理模式,從原map中獲取資料,其本身只是作為一個加鎖的工具

public static Map map = Collections.synchronizedMap(new HashMap());

  上述案例 返回  SynchronizedMap,而其實現方法就是使用mutex鎖進行 synchronized鎖定。方法都要獲取鎖操作,效能不高。