1. 程式人生 > >Java 執行緒工具類

Java 執行緒工具類

1. CountDownLatch

CountDownLatch首先定義任務次數,並呼叫await()方法等待任務完成。呼叫countDown()方法表明已經完成一項任務,當任務全部完成後,繼續await()方法後的任務。

public class CountDownLatchThread extends Thread {

    CountDownLatch latch;

    public CountDownLatchThread(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            System.out.println(currentThread().getName() + " start");
            Thread.sleep(1000);
            latch.countDown();
            System.out.println(currentThread().getName() + " end");
        } catch (InterruptedException e) {
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);

        for (int i = 0; i < 2; i++) {
            new CountDownLatchThread(latch).start();
        }

        System.out.println(Thread.currentThread().getName() + " start");
        latch.await();
        System.out.println(Thread.currentThread().getName() + " end");
    }

}

輸出

main start
Thread-0 start
Thread-1 start
Thread-0 end
Thread-1 end
main end

2. CyclicBarrier

CyclicBarrier會等待指定執行緒的await()方法呼叫結束。如果指定了三個執行緒CyclicBarrier(3),但只有2個執行緒呼叫了await()方法,這兩個執行緒將一直等待下去。

public class CyclicBarrierThread extends Thread {

    CyclicBarrier barrier;

    public CyclicBarrierThread(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    public void run() {
        try {
            System.out.println(currentThread().getName() + " start");
            Thread.sleep(1000);
            barrier.await();
            System.out.println(currentThread().getName() + " end");
        } catch (Exception e) {
        }
    }

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3);
        for (int i = 0; i < 3; i++) {
            new CyclicBarrierThread(barrier).start();
        }
    }

}

輸出

Thread-0 start
Thread-2 start
Thread-1 start
Thread-2 end
Thread-1 end
Thread-0 end

可以呼叫CyclicBarrier的另一個建構函式CyclicBarrier(int, Runnable),在await()方法後最先呼叫Runnable.run()方法。

public static void main(String[] args) {
    CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {			
        @Override
        public void run() {
            System.out.println("In Runnable");
        }
    });
    for (int i = 0; i < 3; i++) {
        new CyclicBarrierThread(barrier).start();
    }
}

輸出

Thread-1 start
Thread-0 start
Thread-2 start
In Runnable
Thread-1 end
Thread-0 end
Thread-2 end

3. ThreadPoolExecutor執行緒池

執行緒池處理流程

  • 核心執行緒池裡執行緒是否都在執行任務。如果不是,則建立一個新的工作執行緒來執行任務。
  • 執行緒池工作佇列是否已滿。如果工作佇列沒有滿,則把新提交任務儲存在這個這個工作佇列。
  • 執行緒池的執行緒是否都處於工作狀態。如果沒有,建立一個新的工作執行緒來執行任務。
  • 飽和策略處理任務。

ThreadPoolExecutor建構函式

public ThreadPoolExecutor(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit unit,
                      BlockingQueue<Runnable> workQueue,
                      ThreadFactory threadFactory,
                      RejectedExecutionHandler handler)
  • corePoolSize(執行緒池基本大小),建立新的執行緒來執行任務,直到執行緒池中數量大於基本大小。

  • workQueue(任務佇列),用來儲存等待執行的阻塞佇列。

    • ArrayBlockingQueue,基於陣列結構的阻塞佇列
    • LinkedBlockingQueue,基於連結串列結構的阻塞佇列
    • SynchronousQueue,不儲存元素的阻塞佇列,每個插入操作必須等待另一個移除操作。
    • PriorityBlockingQueue,一個擁有優先順序的阻塞佇列。
  • maximumPoolSize(執行緒池最大數量),執行緒池允許建立的最大數量,如果佇列已滿,並且建立的執行緒小於最大執行緒數,則執行緒池建立新的執行緒執行任務。

  • keepAliveTime(執行緒活動保持時間),執行緒池空閒時,保持存活的時間。

  • TimeUnit(執行緒活動保持時間的單位),可選DAYS(天)、HOURS(小時)、MINUTES(分鐘)、SECONDS(秒)等。

  • threadFactory,設定建立執行緒的工廠

      public interface ThreadFactory {
          Thread newThread(Runnable r);
      }
    
  • handler(飽和策略),當佇列和執行緒池都滿了,必須採取策略處理新提交任務

    • AbortPolicy,直接丟擲異常。
    • CallerRunsPolicy,在呼叫者所線上程執行任務。
    • DiscardOldestPolicy,丟棄佇列裡最近的一個任務。
    • DiscardPolicy,不處理,丟棄掉。

向執行緒池提交任務

  • execute(Runnable command),提交後不需要返回值,無法判斷任務是否執行成功。
  • submit(Runnable task),提交後需要返回值,並通過Futureget()方法獲取返回值。

4. Executors框架

Executors框架生成執行緒池

  • FixedThreadPool,建立固定執行緒數的執行緒池

      public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
      }
    
      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory);
      }
    
  • SingleThreadExecutor,建立單個執行緒的執行緒池

      public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()));
      }
    
      public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory));
      }
    
  • CachedThreadPool,大小無界的執行緒池,適用於執行很多的短期非同步任務。

      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
      }
    
      public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>(),
                                    threadFactory);
      }
    
  • ScheduledThreadPool,適合於多個後臺執行緒執行週期任務

      public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
          return new ScheduledThreadPoolExecutor(corePoolSize);
      }
    
      public static ScheduledExecutorService newScheduledThreadPool(
              int corePoolSize, ThreadFactory threadFactory) {
          return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
      }