1. 程式人生 > >多執行緒程式設計(1)

多執行緒程式設計(1)

1. 程序與執行緒

  通常,一個任務就是一個程序(Process),比如開啟一個瀏覽器就是啟動一個瀏覽器程序,開啟一個Word就啟動了一個Word程序。大多時候一個程序需要同時幹很多件事情,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,我們把程序內的這些“子任務”稱為執行緒(Thread)。即一個程式至少有一個程序,一個程序至少有一個執行緒。

  我們大學學作業系統的時候,都知道程序是資源分配的基本單位,執行緒是執行和排程的基本單位,執行緒本身不擁有資源,資源來自於它的程序。也就是說,程序在執行過程中有自己獨立的記憶體空間,與其他程序相互隔離,因此程序間的通訊需要另闢蹊徑,比如常見的有管道通訊、訊息佇列、訊號量以及套接字等方法,同時,一個程序中有三大塊——程序控制塊(PCB)、資料段、程式碼段,這會導致程序間的會產生很大的開銷。而執行緒與執行緒之間因為共享程序申請的記憶體區域,它們之間可以相互通訊,因為執行緒的粒度小,使得執行緒的切換速度比程序快很多,可以極大地提高程式的執行效率,如下,是執行緒和程序的關係(圖片摘自知乎)。

  執行緒分為兩種:使用者執行緒和守護執行緒。守護 執行緒,是指在程式執行的時候在後臺提供一種通用服務的執行緒,比如垃圾回收執行緒就是一個很稱職的守護者,並且這種執行緒並不屬於程式中不可或缺的部分。守護執行緒是用來服務使用者執行緒的,一旦使用者執行緒全部執行結束,程式會終止,守護執行緒也會隨之退出。

  在進入多執行緒之前,可以先看看執行緒的幾種狀態:

  • 1. 新建(NEW):新建立了一個執行緒物件。
  • 2. 就緒(RUNNABLE或READY) :執行緒正在參與競爭CPU的使用權。
  • 3. 執行(RUNNING):執行緒取到了CPU的使用權,正在執行。
  • 4. 阻塞(BLOCKED):阻塞狀態是執行緒因為某種原因放棄CPU使用權,暫時停止執行。直到滿足條件(比如超時等待、喚醒)時,該執行緒重新回到就緒態,參與競爭CPU使用權。
  • 5. 等待(WAITING):執行緒無限等待某個物件的鎖,或等待另一個執行緒結束的狀態。
  • 6. 計時等待(TIME_WAITING):執行緒在某一段時間內等待某個物件的“鎖”,或者主動休眠,亦或者等待一個執行緒結束,除非被中斷,時間一到,馬上回到就緒狀態,被中斷的方法則丟擲異常。
  • 7. 終止(Terminated):即執行緒終止(執行緒的的程式碼被執行完畢)和執行過程出現異常或者被外界強制中斷。

狀態的轉換的具體轉換如下圖所示:

2. Thread類和Runnable介面

  通過繼承Thread類,實run方法即可實現一個執行緒類,常用的API如下:

 方法描述
start() 從新建狀態轉化為就緒狀態,開始參與CPU使用權的競爭。
run() 直接呼叫該 Runnable 物件的 run 方法時直接取得CPU的使用權
interrupt() 中斷執行緒。在程式程式碼中搭配while (!Thread.interrupted()){..}使用。
isDaemon() 判斷當前執行緒是否是守護執行緒。
setDaemon(boolean true) 將當前執行緒設定為守護執行緒,必須在呼叫start()之後才有效。
setPriority(int priority) 更改執行緒的優先順序。
interrupt() 中斷執行緒。
isAlive() 測試執行緒是否處於活動狀態。
join(long millisec) 等待該執行緒終止的時間最長為 millis 毫秒。
Thread.yield() 暫停當前正在執行的執行緒物件(讓出當前執行緒的CPU,轉為就緒狀態),並執行其他執行緒。
Thread.currentThread() 返回對當前正在執行的執行緒物件的引用。
Thread.sleep(long millisec) 在指定的毫秒數內讓當前正在執行的執行緒休眠(暫停執行),此操作受到系統計時器和排程程式精度和準確性的影響。

  由於java中的類是單繼承的,而介面可以多繼承。一個類實現多個介面的情況,因為介面只有抽象方法,具體方法只能由實現介面的類實現,在呼叫的時候始終只會呼叫實現類的方法(不存在歧義),因此在開發中通常使用Runnable。

public class Thread1 implements Runnable {
    @Override
    public void run() {
        System.out.println("iii");
    }
​
    public static void main(String[] args) {
        Thread1 rt = new Thread1();
        Thread t = new Thread(rt);
        t.start();
    }
}

3. 執行緒池

  當執行緒的在某一時刻大量的建立與銷燬會消耗很多資源,我們可以提前建立好一些執行緒,將他們集中管理起來,形成一個執行緒池,需要使用的時候直接拿過來用,使用完後,放回執行緒池。

Executor框架

  在 java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化併發程式設計的操作,由Executors類的五個靜態工廠方法建立,其常用方法如下。

3.1 執行緒池的建立

  1. newFixedThreadPool:建立固定大小的執行緒池。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

    public class Executors {
        /*
        函式功能:建立一個固定長度的的執行緒池,用於儲存任務的阻塞佇列為無限制長度的LinkedBlockingQueue。 執行緒池中的執行緒將會一直存在除非執行緒池shutdown,即執行緒池中的執行緒沒有受到存活時間的限制。
        */
        public static ExecutorService newFixedThreadPool(int nThreads) {
           /* 引數一 核心執行緒數大小(最小執行緒數),當執行緒數 < 引數一 ,會建立執行緒執行 runnable
            * 引數二 最大執行緒數, 當執行緒數 >= 引數二,會把runnable放入workQueue(引數5)中
            * 引數三 保持存活時間,空閒執行緒能保持的最大時間。
            * 引數四 時間單位
            * 引數五 儲存任務的阻塞佇列
            *public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueu
            */
                return new ThreadPoolExecutor(nThreads1, nThreads2,
                                              0L, TimeUnit.MILLISECONDS,
                                              new LinkedBlockingQueue<Runnable>());
        }
        //...
    }
    ​
    ExecutorService es = Executors.newFixedThreadPool(20);
    //如果執行緒池中執行緒數過大或過小,都會影響效能
  2. newCachedThreadPool:建立一個可快取空閒執行緒60秒的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。

    public class Executors {    
        public static ExecutorService newCachedThreadPool() {
                return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                              60L, TimeUnit.SECONDS,
                                              new SynchronousQueue<Runnable>());
         }
        //...   
    }
    ​
    ExecutorService es = Executors.newCachedThreadPool();
    //缺點是在訪問量突然很大的時候,會建立大量執行緒
  3. 建立一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。

    ExecutorService es = Executors.newSingleThreadExecutor();
    //等同於 ExecutorService es = Executors.newFixedThreadPool(1);
  4. newScheduledThreadPool:建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

     public static void main(String[] args) {
            ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
            ses.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(new Date());
                }
            }, 1000/*第一個週期開始的時間*/, 2000/*每個週期間隔的時間*/, TimeUnit.MILLISECONDS);
        }
  5. newSingleThreadScheduledExecutor:建立一個單執行緒的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

3.2 執行緒池中執行緒的使用

  通過Executors類去獲得的執行緒池都實現了ExecutorService這個介面。可以呼叫execute()或者submit()方法把相應的任務提交到執行緒池中去。

 1. execute(Runnable): 這個方法接收一個Runnable例項,並且非同步的執行。

public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
//        Future future =
        es.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("run the thread.");
            }
        });
        System.out.println("over");
 }
/* output:
 * over
 * run the thread.
 */

2. submit(Runnable): submit(Runnable)execute(Runnable)區別是前者可以返回一個Future物件,通過返回的Future物件,我們可以檢查提交的任務是否執行完畢。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
        Future future = es.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("run the thread.");
            }
        });
        future.get();  //future.get()方法會產生阻塞,直到上面的執行緒完成,即等待一秒鐘
        System.out.println("over");
}
/* output:
 * run the thread.
 * over
 */

3. submit(Callable): submit(Callable)submit(Runnable)類似,也會返回一個Future物件,但是引數Callable類中的call方法可以返回一個值,而Runable不行。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
        Future future = es.submit(new Callable() {
            @Override
            public Object call() throws Exception {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "run the thread.";
            }
        });
        String rs = (String)future.get();  //future.get()方法會產生阻塞
        System.out.println("over and " + rs);
 }
/* output:
 * over and run the thread.
 */

4. invokeAny(Collection<? extends Callable<T>> tasks>): 方法輸入接受一個Callable集合型別的引數,啟動多個執行緒相互獨立的去執行對應執行緒的任務,一旦有一個執行緒執行完畢,則返回,同時其他執行緒終止。

public static void main(String[] args) throws ExecutionException, InterruptedException {
​
        ExecutorService es = Executors.newFixedThreadPool(3);
​
        Set<Callable<String>> callables = new HashSet<Callable<String>>();
​
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return " first task";
            }
        });
​
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return " second task";
            }
        });
​
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return " third task";
            }
        });
​
        String rs = es.invokeAny(callables);
        System.out.println(rs);
}
​
/* output
 * second task
 */

5. invokeAll(Collection<? extends Callable<T>> tasks>): 該方法則會並行的執行Callable集合型別的所有方法。

public static void main(String[] args) throws ExecutionException, InterruptedException {
​
        ExecutorService es = Executors.newFixedThreadPool(3);
​
        Set<Callable<String>> callables = new HashSet<Callable<String>>();
​
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return " first task";
            }
        });
​
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return " second task";
            }
        });
​
        callables.add(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return " third task";
            }
        });
​
        List<Future<String>> list= es.invokeAll(callables);
        for (Future<String> future:list) {
            String s = future.get();
            System.out.println(s);
        }
}

6. shotdown(): 呼叫該方法後,關閉執行緒池,已提交的方法會繼續執行,執行結束後,執行緒池全部關閉,該方法是一個非同步方法,一旦呼叫,立即返回。

7.shotdownNow(): 呼叫該方法後,關閉執行緒池,已提交的方法也會被取消,執行緒池立即全部關閉,該方法是一個非同步方法,一旦呼叫,立即返回。

8. awaitTermination(timeout,unit): 呼叫該方法阻塞當前執行緒,使得執行緒池中的執行緒執行完畢,最長等待時間為timeout,此方法需要在呼叫shotdown/shotdownNow後才有效。

public class ThreadSafe implements Runnable {
    private static int count = 0;
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            count++;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new ThreadSafe());
        }
        es.shutdown();  //不允許新增執行緒,非同步關閉連線池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待連線池的執行緒任務完成
        System.out.println(count);
    }
}
/* output
*  200
*/

參考文獻

  1. 龐永華. Java多執行緒與Socket:實戰微服務框架[M].電子工業出版社.2019.3

  2. Executors類中建立執行緒池的幾種方法的分析

 

&n