1. 程式人生 > >Java 多執行緒(七)——執行緒組與執行緒池

Java 多執行緒(七)——執行緒組與執行緒池

1 執行緒組

1.1 概述

  • Java中使用ThreadGroup來表示執行緒組,它可以對一批執行緒進行分類管理。對執行緒組的控管理,即同時控制執行緒組裡面的這一批執行緒。
  • 使用者建立的所有執行緒都屬於指定執行緒組,如果沒有顯示指定屬於哪個執行緒組,那麼該執行緒就屬於預設執行緒組(即main執行緒組)。預設情況下,子執行緒和父執行緒處於同一個執行緒組。
  • 只有在建立執行緒時才能指定其所在的執行緒組,執行緒執行中途不能改變它所屬的執行緒組,也就是說執行緒一旦指定所在的執行緒組,就直到該執行緒結束。
  • 執行緒組與執行緒之間結構類似於樹形的結構:
    這裡寫圖片描述

1.2 API介紹

1.2.1 Thread類

  • Thread(ThreadGroup group,Runnable target):group屬於的執行緒組,target為新執行緒
  • Thread(ThreadGroup group,Runnable target,String name):group屬於的執行緒組,target為新執行緒,name:執行緒名
  • Thread(ThreadGroup group,String name):新執行緒名為name,屬於group執行緒組

1.2.2 ThreadGroup類

(1)構造方法

  • ThreadGroup(String name):以指定執行緒組名字來建立新執行緒組
  • ThreadGroup(ThreadGroup parent,String name)
    :以指定的名字、指定的父執行緒組來建立一個新執行緒組。

(2)常用操作方法

  • int activeCount():獲取執行緒組中活動執行緒的數量
  • interrupt():中斷執行緒組中所有執行緒
  • isDaemon():是否為後臺執行緒組
  • setDaemon(boolean daemon):設定為後臺執行緒組
  • setMaxPriority(int pri):設定執行緒組的最高優先順序

1.3 簡單示例

package com.jtzen9;

public class Main {
    public static void main(String[] args) {

        // 獲取主執行緒所在的執行緒組,這是所有執行緒預設的執行緒組
ThreadGroup mainGroup = Thread.currentThread().getThreadGroup(); System.out.println("主執行緒組的名字:" + mainGroup.getName()); System.out.println("主執行緒組是否是後臺執行緒組:" + mainGroup.isDaemon()); new MyThread("主執行緒組的執行緒").start(); ThreadGroup tg = new ThreadGroup("新執行緒組"); tg.setDaemon(true); System.out.println("tg執行緒組是否是後臺執行緒組:" + tg.isDaemon()); MyThread tt = new MyThread(tg , "tg組的執行緒甲"); tt.start(); new MyThread(tg , "tg組的執行緒乙").start(); } } class MyThread extends Thread { // 提供指定執行緒名的構造器 public MyThread(String name) { super(name); } // 提供指定執行緒名、執行緒組的構造器 public MyThread(ThreadGroup group , String name) { super(group, name); } public void run() { for (int i = 0; i < 20 ; i++ ) { System.out.println(getName() + " 執行緒的i變數" + i); } } }

輸出結果:

主執行緒組的名字:main
主執行緒組是否是後臺執行緒組:false
tg執行緒組是否是後臺執行緒組:true
主執行緒組的執行緒 執行緒的i變數0
主執行緒組的執行緒 執行緒的i變數1
主執行緒組的執行緒 執行緒的i變數2
主執行緒組的執行緒 執行緒的i變數3
主執行緒組的執行緒 執行緒的i變數4
tg組的執行緒甲 執行緒的i變數0
tg組的執行緒甲 執行緒的i變數1
tg組的執行緒甲 執行緒的i變數2
tg組的執行緒甲 執行緒的i變數3
tg組的執行緒甲 執行緒的i變數4
tg組的執行緒乙 執行緒的i變數0
tg組的執行緒乙 執行緒的i變數1
tg組的執行緒乙 執行緒的i變數2
tg組的執行緒乙 執行緒的i變數3
tg組的執行緒乙 執行緒的i變數4

1.4 未處理的異常

  ThreadGroup內定義了一個方法:void uncaughtException(Thread t,Throwable e),該方法可以處理該執行緒組內的任意執行緒所丟擲的未處理異常。
  ThreadGroup類實現了Thread.UncaughtExceptionHandler介面,所以每個執行緒所屬的執行緒組將會作為預設的異常處理器。當一個執行緒丟擲未處理異常時,JVM會首先查詢該異常對應的異常處理器處理該異常;否則,JVM將會呼叫該執行緒所屬的執行緒組物件的uncaughtException()方法來處理該異常。
例子:

package com.jtzen9;

public class Main {
    public static void main(String[] args) {
        // 設定主執行緒的異常處理器
        Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler());
        int a = 5 / 0;
        System.out.println("程式正常結束!");
    }
}
// 定義自己的異常處理器
class MyExHandler implements Thread.UncaughtExceptionHandler {
    // 實現uncaughtException方法,該方法將處理執行緒的未處理異常
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println(t + " 執行緒出現了異常:" + e);
    }
}

輸出:

Thread[main,5,main] 執行緒出現了異常:java.lang.ArithmeticException: / by zero

2 執行緒池

  與資料庫連線池類似,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個Runnable物件或Callable物件傳給執行緒池,執行緒池就會啟動一個執行緒來執行它們的run()或call()方法,當執行完畢後,該執行緒並不會死亡,而是再次返回執行緒池中成為空閒狀態,等待執行下一個任務。

  • Executors工廠類:該工廠類包含多個靜態工廠方法來建立執行緒池。
    ① newCachedThreadPool():建立一個具有快取功能的執行緒池,系統根據需要建立執行緒,這些執行緒將會被快取線上程池中。
    ② newFixedThreadPool(int nThreads):建立一個可重用的、具有固定執行緒數的執行緒池
    ③ newScheduledThreadPool(int corePoolSize):建立具有指定執行緒數的執行緒池,它可以在指定延遲後執行執行緒任務。
    ④ newWorkStealingPool(int parallelism):建立持有足夠的執行緒的執行緒池來支援給定的並行級別,該方法還會使用多個佇列來減少競爭。
  • ExecutorService類:代表盡執行執行緒的執行緒池,即只要執行緒池中有空閒執行緒,就立即執行執行緒任務
  • ScheduledExecutorService類:代表可在指定延遲後或週期性地執行執行緒任務的執行緒池。

使用執行緒池來執行執行緒任務的步驟如下:

  1. 呼叫Executors類的靜態工廠方法建立一個ExecutorService物件,該物件代表一個執行緒池
  2. 建立Runnable實現類或Callable實現類的例項,作為執行緒執行任務
  3. 呼叫ExecutorService物件的submit()方法來提交Runnable例項或Callable例項
  4. 當不想提交任何任務時,呼叫ExecutorService物件的shutdown()方法來關閉執行緒池

例子:

package com.jtzen9;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) throws Exception{
        // 建立一個具有固定執行緒數的執行緒池
        ExecutorService pool = Executors.newFixedThreadPool(6);
        // 使用Lambda表示式建立Runnable物件
        Runnable target = () ->{
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + "的i值為:" + i);
            }
        };
        //  向執行緒池中提交兩個執行緒
        pool.submit(target);
        pool.submit(target);
        // 關閉執行緒池
        pool.shutdown();
    }
}

輸出:

pool-1-thread-1的i值為:0
pool-1-thread-2的i值為:0
pool-1-thread-1的i值為:1
pool-1-thread-2的i值為:1
pool-1-thread-1的i值為:2
pool-1-thread-2的i值為:2
pool-1-thread-1的i值為:3
pool-1-thread-2的i值為:3
pool-1-thread-1的i值為:4
pool-1-thread-2的i值為:4
pool-1-thread-1的i值為:5
pool-1-thread-2的i值為:5
pool-1-thread-1的i值為:6
pool-1-thread-1的i值為:7
pool-1-thread-1的i值為:8
pool-1-thread-2的i值為:6
pool-1-thread-1的i值為:9
pool-1-thread-2的i值為:7
pool-1-thread-2的i值為:8
pool-1-thread-2的i值為:9

3 執行緒組和執行緒池的區別

  • 執行緒組是為了方便執行緒的管理
  • 執行緒池是為了管理執行緒的生命週期,複用執行緒,減少建立銷燬執行緒的開銷。

4 ForkJoinPool類

  為了充分利用多CPU、多核CPU的優勢,可以考慮把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之後,再將這些執行結果合併起來即可
  ForkJoinPool類支援將一個任務拆分成多個“小任務”平行計算,再把多個“小任務”的結果合併成總的計算結果。
  ForkJoinPool是ExecutorService的實現類,因此是一種特殊的執行緒池。建立了ForkJoinPool類例項,就可以呼叫ForkJoinPool的submit(ForkJoinTask task)invoke(ForkJoinTask task)方法來執行指定任務。其中ForkJoinTask代表一個可以並行、合併的任務。
  ForkJoinTask是一個抽象類,有兩個抽象子類:RecursiveAction(代表沒有返回值的任務)和RecursiveTask(代表有返回值的任務)。

4.1 有返回值的任務

package com.jtzen9;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class Main {
    public static void main(String[] args) throws Exception{
        int[] arr = new int[100];
        Random rand = new Random();
        int total = 0;
        // 初始化100個數字元素
        for (int i = 0 , len = arr.length; i < len ; i++ )
        {
            int tmp = rand.nextInt(20);
            // 對陣列元素賦值,並將陣列元素的值新增到sum總和中。
            total += (arr[i] = tmp);
        }
        System.out.println(total);
        // 建立一個通用池
        ForkJoinPool pool = ForkJoinPool.commonPool();
        // 提交可分解的CalTask任務
        Future<Integer> future = pool.submit(new CalTask(arr , 0 , arr.length));
        System.out.println(future.get());
        // 關閉執行緒池
        pool.shutdown();
    }
}
// 繼承RecursiveTask來實現"可分解"的任務
class CalTask extends RecursiveTask<Integer> {
    // 每個“小任務”只最多隻累加20個數
    private static final int THRESHOLD = 20;
    private int arr[];
    private int start;
    private int end;
    // 累加從start到end的陣列元素
    public CalTask(int[] arr , int start, int end) {
        this.arr = arr;
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        // 當end與start之間的差小於THRESHOLD時,開始進行實際累加
        if(end - start < THRESHOLD) {
            for (int i = start ; i < end ; i++ ) {
                sum += arr[i];
            }
            return sum;
        }
        else {
            // 如果當end與start之間的差大於THRESHOLD時,即要累加的數超過20個時
            // 將大任務分解成兩個小任務。
            int middle = (start + end) / 2;
            CalTask left = new CalTask(arr , start, middle);
            CalTask right = new CalTask(arr , middle, end);
            // 並行執行兩個“小任務”
            left.fork();
            right.fork();
            // 把兩個“小任務”累加的結果合併起來
            return left.join() + right.join();    // ①
        }
    }
}
942
942

4.2 沒有返回值的任務

package com.jtzen9;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws Exception{
        ForkJoinPool pool = new ForkJoinPool();
        // 提交可分解的PrintTask任務
        pool.submit(new PrintTask(0 , 300));
        pool.awaitTermination(2, TimeUnit.SECONDS);
        // 關閉執行緒池
        pool.shutdown();
    }
}
// 繼承RecursiveAction來實現"可分解"的任務
class PrintTask extends RecursiveAction {
    // 每個“小任務”只最多隻列印50個數
    private static final int THRESHOLD = 50;
    private int start;
    private int end;
    // 列印從start到end的任務
    public PrintTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute() {
        // 當end與start之間的差小於THRESHOLD時,開始列印
        if(end - start < THRESHOLD) {
            for (int i = start ; i < end ; i++ ) {
                System.out.println(Thread.currentThread().getName()
                        + "的i值:" + i);
            }
        }
        else {
            // 如果當end與start之間的差大於THRESHOLD時,即要列印的數超過50個
            // 將大任務分解成兩個小任務。
            int middle = (start + end) / 2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            // 並行執行兩個“小任務”
            left.fork();
            right.fork();
        }
    }
}

部分輸出:

ForkJoinPool-1-worker-1的i值:262
ForkJoinPool-1-worker-1的i值:263
ForkJoinPool-1-worker-1的i值:264
ForkJoinPool-1-worker-1的i值:265
ForkJoinPool-1-worker-1的i值:266
ForkJoinPool-1-worker-2的i值:112
ForkJoinPool-1-worker-2的i值:113
ForkJoinPool-1-worker-2的i值:114
ForkJoinPool-1-worker-3的i值:37
ForkJoinPool-1-worker-1的i值:267
ForkJoinPool-1-worker-3的i值:42
ForkJoinPool-1-worker-2的i值:120
ForkJoinPool-1-worker-3的i值:43
ForkJoinPool-1-worker-0的i值:75
ForkJoinPool-1-worker-2的i值:121
ForkJoinPool-1-worker-0的i值:76
ForkJoinPool-1-worker-1的i值:268
ForkJoinPool-1-worker-3的i值:44
ForkJoinPool-1-worker-1的i值:269
ForkJoinPool-1-worker-0的i值:77
ForkJoinPool-1-worker-2的i值:122
ForkJoinPool-1-worker-0的i值:78
ForkJoinPool-1-worker-0的i值:79
ForkJoinPool-1-worker-0的i值:80
ForkJoinPool-1-worker-0的i值:81
ForkJoinPool-1-worker-0的i值:82
ForkJoinPool-1-worker-0的i值:83
ForkJoinPool-1-worker-0的i值:84
ForkJoinPool-1-worker-0的i值:85
ForkJoinPool-1-worker-0的i值:86
ForkJoinPool-1-worker-0的i值:87
ForkJoinPool-1-worker-0的i值:88
ForkJoinPool-1-worker-1的i值:270
ForkJoinPool-1-worker-3的i值:45
ForkJoinPool-1-worker-1的i值:271
ForkJoinPool-1-worker-0的i值:89
ForkJoinPool-1-worker-2的i值:123
ForkJoinPool-1-worker-0的i值:90
ForkJoinPool-1-worker-1的i值:272
ForkJoinPool-1-worker-3的i值:46
ForkJoinPool-1-worker-1的i值:273
ForkJoinPool-1-worker-0的i值:91
ForkJoinPool-1-worker-2的i值:124
ForkJoinPool-1-worker-0的i值:92