1. 程式人生 > >java並發學習--線程池(一)

java並發學習--線程池(一)

第一個 down holding scheduled class 缺點 有關 maximum 設計

關於java中的線程池,我一開始覺得就是為了避免頻繁的創建和銷毀線程吧,先創建一定量的線程,然後再進行復用。但是要具體說一下如何做到的,自己又說不出一個一二三來了,這大概就是自己的學習習慣流於表面,不經常深入的結果吧。所以這裏決定系統的學習一下線程池的相關知識。

自己稍微總結了一下,學習一些新的知識或者技術的時候,大概都可以分為這麽幾個點:

1、為什麽會有這項技術,用原來的方法有什麽問題。

2、這項新技術具體是怎麽解決這個問題的(這時可能就要涉及到一些具體的知識點和編碼了)

3、是不是使用這項技術問題就可以得到完美解決了,有沒有什麽不同的方案?各自的優缺點是什麽?(這是對一些具體的技術來說的,但是線程池是一個比較大的概念,可能不涉及這一點,但相應的線程池中有許多不同的種類,來應對不同的場景)

下面的內容是自己讀過《實戰java 高並發程序設計》之後加上自己的理解寫的筆記,如果有錯漏之處,請大家在評論區指出。

為什麽要使用線程池?


1、正如前面的所說,頻繁的創建和銷毀時間消耗了太多的資源,占用了太多的時間

2、線程也是需要占用一定內存的,同時存在很多個線程的話,內存很快就溢出了,即使沒有溢出,大量的線程回收也會對GC造成很大的壓力,延長GC的停頓時間

這裏可以舉個例子來說明一下,比如你去銀行辦理業務時,首先得拿號排隊吧,然後叫你去哪個窗口你就得去哪個窗口,在我看來,這就是一個很典型的線程池的例子。

我們可以想象一下,如果不按這種模式,會是什麽樣子……

你來到了銀行的業務大廳,業務經理問你要辦理什麽業務,你說我想開個賬戶,於是經理拿起手機打通了職工大樓的電話,“讓負責開賬戶的的那個小組派個人過來”(new 了一個開賬戶的對象),業務員馬不停蹄的趕了過來然後幫你處理完了任務,只聽經理又說到“這裏沒你事兒了,回去吧”,於是你又回到了職工大樓。

然後又來了一個客戶……

於是你把上述的過程又執行了一遍,那麽業務員在路上的時間可能比處理業務的時間還要長了。

更糟的是,如果有200個線程同時存在,並且每一個客戶的業務處理時間都非常的長,那麽業務大廳就可能同時存在200個客戶和業務員了,大廳擠得都快趕上春運了。

ps : 上面這個小例子舉得並不是很好,所以大家不要跟實際的知識點對號入座。比如說這裏有10個業務員,那麽10個業務員實際上是不能同時進行服務的,因為你的電腦沒有10個cpu,只能是cpu不斷的在線程之間進行切換,只要它換的夠快,就可以給每一個客戶一種他一直都在為我服務的感覺。

認識線程池


線程池的輪子我們已經不用自己造了,在jdk5版本之後,引入了Executor框架,用於管理線程。

Executor 框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等

先放一張Executor框架的部分類圖(下面這個類圖就是用idea自帶的工具做的,非常方便,有時間再寫一下它的用法):

技術分享圖片

其中虛線箭頭指的是實現,實線箭頭指的是繼承

而本文中我們需要了解的就是這個ThreadPoolExecutor 和 下面這個Executors了。

ThreadPoolExecutor:

從網上找了一個小例子,就是給一個集合添加2000個元素,我們分成兩個測試,一個測試是添加一次元素就創建一個線程,另外一個測試是先創建好線程池,然後再添加。

不使用線程池版本:

//每一次添加操作都開一個線程
    public static void getTimeWithThread() {
        System.out.println("使用多線程測試 start");
        final List<Integer> list = new LinkedList<>();
        Random random = new Random();
        long start = System.currentTimeMillis();
Runnable target
= new Runnable() { @Override public void run() { list.add(random.nextInt()); } }; for (int i = 0; i < 20000 ; i++) { Thread thread = new Thread(target); thread.start(); try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } long end = System.currentTimeMillis(); long time = end - start; System.out.println("最終list的大小為:" + list.size()); System.out.println("使用多線程測試 end, 用時:" + time + "ms\n"); }

例子比較簡單,我們需要創建線程,這裏用的是實現Runnable接口的方式,然後為了保證子線程執行完成之後主線程(main線程)才執行,我們這裏使用了join方法。

那麽整個for循環的意思就是,我開啟一個線程,然後你的main線程得等我執行完之後才能開啟下一個線程繼續執行。

使用線程池版本:

    //使用線程池進行集合添加元素的操作
    public static void getTimeWithThreadPool() {
        System.out.println("使用線程池測試 start");
        final List<Integer> list = new LinkedList<>();
        Random random = new Random();
        long start = System.currentTimeMillis();
Runnable target
= new Runnable() { @Override public void run() { list.add(random.nextInt()); } }; ThreadPoolExecutor tp = new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000)); for (int i = 0; i < 20000 ; i++) { tp.execute(target); } tp.shutdown(); try { tp.awaitTermination(1,TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); long time = end - start; System.out.println("最終list的大小為:" + list.size()); System.out.println("使用線程池測試 end, 用時:" + time + "ms\n"); }

使用線程池就比較簡單了,我們只要先創建好線程池,然後向它提交任務就行了,具體的線程該怎麽操作,怎麽管理都不用我們來操心。

execute() : 它是頂層接口Executor的一個方法(也是唯一一個),其實跟普通的創建線程執行run方法沒有太大的區別

shutdown(): 顧名思義是關閉線程池,它會將已經提交但還沒有執行的任務執行完成之後再關閉線程池(什麽是提交?我們後面再說)

至於ThreadPoolExecutor裏面那一大堆參數,我們慢慢再來看。

最後的測試結果:

不用線程池的話,我的機器跑出來大概要8000ms左右(人家網上的例子測出來只要2000ms左右,這差距,是我電腦垃圾,還是jvm沒設置好啊,之後再來看這個問題),使用線程池的話是180ms左右。

可以看出來線程池相對於單純的使用線程來說的話作用是相當大的。

ps:這裏自己另外測試了一組,不使用線程直接添加,發現時間會快很多,這個問題其實還不是非常明白,多個線程一起執行難道不是執行的更快嗎?暫時還沒有得出結論,等更進一步的理解之後再寫一篇文章來進行分析。

ThreadPoolExecutor裏面那些參數都是幹嘛用的?

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize : 指定了線程池的線程數量

maximumPoolSize : 指定了線程池中的最大線程數量

keepAliveTime : 超過了corePoolSize時多余線程的存活時間

unit : KeepAliveTime的時間單位

workQueue : 任務隊列,被提交但尚未被執行的任務

threadFactory: 線程工廠,用於創建線程

handler : 拒絕策略,當任務太多來不及處理的時候,如何拒絕任務

corePoolSize 和 maximumPoolSize(這裏假設corePoolSize是5,maximumPoolSize是10 )

線程池的工作原理是你來一個線程,我就幫你在線程池中新創建一個線程,創建了5個線程之後,再來一個線程,我就不是在第一時間去創建一個新的線程,而是把它加入到一個等待隊列中去,等線程池中有了空余的線程再從隊列中拿一個出來進行 處理,等待隊列的容量是我們一開始設置好的,如果等待隊列也滿了的話再去創建新的線程。

當線程池也滿了,等待隊列也滿了(線程池數量達到了maximumPoolSize)的時候就拒絕執行線程的任務,這就涉及到了拒絕的策略。

而經過一段時間之後發現,業務沒有那麽繁忙了,就不需要一直維持著10個線程,可以清除掉一部分,以免占據多余的空間。

keepAliveTime 和 unit

有了上面的結束,這個參數就比較好理解了,上面說線程滿了再經過一段時間之後就會被清除掉一部分線程,這個經過的時間就是有keepAliveTime 和 unit決定的

比如 keepAliveTime = 1 ,unit = TimeUnit.Days ,那麽就是經過一天之後再去清理線程池。

workQueue :

我們前面也提到了當線程的數量超過了coreSize之後會添加到一個等待隊列中去,這個隊列就是workQueue。workQueue 采用的是一個實現了BlockingQueue的接口的對象

work也分為不同的幾種,采取不同的策略

  • ArrayBlockingQueue(有界的任務隊列) :
pubic ArrayBlockingQueue( int capacity )

首先是最容易想到的,就是給等待隊列設置一個容量,超過這個容量之後再創建新的線程。

  • SynchronousQueue該隊列沒有容量,每插入一個元素都要等待一個刪除操作,使用這個隊列的話,任務不會實際保存到隊列中去,會直接提交到線程池中,如果線程池還沒有滿(還沒達到maximumPoolSize),則分配線程,否則執行拒絕策略。

  • LinkedBlockingQueue(無界的任務隊列) : 顧名思義,這個隊列是沒有界限的,就是說你可以一直往隊列裏添加元素,直到內存資源被耗盡。

  • PriorityBlockingQueue(優先任務隊列,同時也是一個無界的任務隊列):可以控制任務的優先級(優先級是通過實現Comparable接口實現的,具體可百度)

handler(拒絕策略):

當線程池和等待隊列都滿了之後,線程池就會拒絕執行新的任務了,那麽該怎麽拒絕呢,直接就說你走吧,哥們兒hold不住了嗎?顯然沒這麽簡單。。

AbortPlicy 策略 : 直接拋出異常,阻止系統正常工作

CallerRunsPolicy策略 : 只要線程池沒有關閉,就在調用者線程之中執行這個任務。比如說是主線程提交的這個任務,那我就直接在主線程之中執行這個任務。

DiscardOledestPolicy策略:該策略會丟掉最老的一個請求,也就是即將被執行的那個請求。並嘗試再次發起請求。

DiscardPolicy策略:直接丟,不做任何處理

以上策略都是通過實現RejectedExecutionHandler接口實現的,如果上述策略還無法滿足你的話,那麽你也可以自己實現這個接口。

Executors:


介紹了基本的線程池之後就可以介紹一些jdk為我們寫好的一些線程池了。

它由Executors類生成的。有以下幾種:

  • newFIxedThreadPool :固定大小線程池,大小固定,所以它不存在corePoolSize 和 maximumPoolSize ,並且使用無界隊列作為等待隊列

  • newSingleThreadExecutor : 與newFixedThreadPool基本沒有什麽區別,但是數量只有1個線程

  • newCachedThreadPool :一個corePoolSize,maximumPoolSize無限大的線程池,也就是說沒有任務時,線程池中就沒有線程,任務被提交時會看線程池有有沒有空閑的線程,如果有的話,就交給它執行,如果沒有的話,就交給SynchronousQueue, 也就是說會直接交給線程池,而由於maximumPoolSize是無限大的,所以它會再添加一個線程

  • newScheduledThreadPool :定時執行任務的線程池(可以是延時執行,也可以是周期性的執行任務)

  • newSingleThreadScheduledExecutor :與上面的線程池差不多,只不過線程池的大小為1。

以newFixedThreadPool為例展示一下它的使用方法。

package thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 展示Executors的簡單用法
 */
public class Lesson15_ThreadPool02 {
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis() +
                        ": Thread Id : " + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService ex = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10 ; i++) {
            ex.submit(task);
        }
        ex.shutdown();
    }
}
1541580732981: Thread Id : 13
1541580732981: Thread Id : 14
1541580732981: Thread Id : 11
1541580732981: Thread Id : 12
1541580732981: Thread Id : 15
1541580733981: Thread Id : 13
1541580733981: Thread Id : 15
1541580733981: Thread Id : 12
1541580733981: Thread Id : 11
1541580733981: Thread Id : 14

關於定時線程池的兩個方法的區別:

  • scheduleAtFixedRate以給定的的周期執行任務,任務開始於給定的初始延時,經過period之後開始下一個任務

舉個例子,比如說,初始延時是1秒,period是5秒,任務實際的執行時間是2秒,那麽第一個任務開始執行的時間是1秒,再第二個任務執行的時間是6秒,你看跟任務的實際執行時間並沒有什麽關系。

但是這裏會有一個顯而易見的問題,按照上面的說法,如果我的任務執行時間是10秒怎麽辦,遠比period要大,那麽此時會等待上一個任務執行完成之後立即執行下一個任務,

你也可以理解成period變成了8秒

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

  • scheduleWithFixedDelay這個方法則規定了上一個任務結束到下一個任務開始這之間的時間,還是上面那個例子,只不過將period改成delay還是5秒,那麽第一個任務在第1秒開始執行,第2個任務在(1 + 2 + 5) = 8 時開始執行,也就是第一個任務執行完成之後再等5秒開始執行下一個任務。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

以schedeleAtFixedRate為例,簡單寫一下代碼的用法:

package thread;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 這裏演示定時線程池的功能
 */
public class Lesson15_ThreadPool03 {
    public static void main(String[] args) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()/1000 +
                        ": Thread Id : " + Thread.currentThread().getId());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        System.out.println(System.currentTimeMillis()/1000);
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);

        ses.scheduleAtFixedRate(task,1,3,TimeUnit.SECONDS);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ses.shutdown();

    }

}

1541584928
1541584929: Thread Id : 11
1541584932: Thread Id : 11
1541584935: Thread Id : 12

Process finished with exit code 0

從28開始執行定時線程池的任務,1秒鐘(初始延時)之後開始執行第一個任務,之後每過三秒鐘執行下一個任務

這裏如果不關閉線程池的話,任務會一直執行下去。

線程池的分析暫時先到這裏,還有一部分內容,例如擴展線程池,如何決定線程池的線程數量,fork/join框架等。等認真讀過下一部分之後再繼續把線程池部分的筆記湊齊。

package thread;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 這裏演示定時線程池的功能
*/
public class Lesson15_ThreadPool03 {
public static void main(String[] args) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000 +
": Thread Id : " + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
System.out.println(System.currentTimeMillis()/1000);
ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);

ses.scheduleAtFixedRate(task,1,5,TimeUnit.SECONDS);

}

}

java並發學習--線程池(一)