1. 程式人生 > >《java程式設計思想》第二十一章 併發

《java程式設計思想》第二十一章 併發

Thread.yield():對執行緒排程器(Java執行緒機制的一部分,可以將CPU從一個執行緒轉移給另一個執行緒)的一種建議,它是在說“我已經執行完生命期中最重要的部分了,此刻正是切換給其他任務執行一段時間的大好時機”,即對執行緒的一種讓步,暫停當前正在執行的執行緒,並執行其他執行緒。

 

>>>使用Executor<<<

Java.util.concurrent包中的執行器(Executor)將為你管理Thread物件,從而簡化了併發程式設計。

 

Executors:Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable類的工廠和實用方法。

 

Executors.newCachedThreadPool():建立一個可根據需要建立新執行緒的執行緒池,但是在以前構造的執行緒可用時將重用它們。對於執行很多短期非同步任務的程式而言,這些執行緒池通常可提高程式效能。呼叫 execute將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新執行緒並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。因此,長時間保持空閒的執行緒池不會使用任何資源。注意,可以使用 ThreadPoolExecutor構造方法建立具有類似屬性但細節不同(例如超時引數)的執行緒池。

 

public class LiftOff implements Runnable {

  protected int countDown = 10; // Default

  private static int taskCount = 0;

  private final int id = taskCount++;

  public LiftOff() {}

  public LiftOff(int countDown) {

    this.countDown = countDown;

  }

  public String status() {

    return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";

  }

  public void run() {

    while(countDown-- > 0) {

      System.out.print(status());

      Thread.yield();

    }

  }

}

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CachedThreadPool {

  public static void main(String[] args) {

    ExecutorService exec = Executors.newCachedThreadPool();

    for(int i = 0; i < 5; i++)

      exec.execute(new LiftOff());

    exec.shutdown();

  }

}

/*

#2(9), #0(9), #4(9), #1(9), #3(9), #2(8), #0(8), #4(8), #1(8), #3(8), #2(7), #0(7), #4(7), #1(7), #3(7), #2(6), #0(6), #4(6), #1(6), #3(6), #2(5), #0(5), #4(5), #1(5), #3(5), #2(4), #0(4), #4(4), #1(4), #3(4), #2(3), #0(3), #4(3), #1(3), #3(3), #2(2), #4(2), #1(2), #3(2), #2(1), #4(1), #1(1), #3(1), #2(Liftoff!), #4(Liftoff!), #1(Liftoff!), #3(Liftoff!), #0(2), #0(1), #0(Liftoff!),

*/

 

 

Executors. newFixedThreadPool (int nThreads):可一次性預先執行代價高昂的執行緒分配,因而也就可以限制執行緒數量。不用為每個任務都固定地付出建立執行緒的開銷。

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class FixedThreadPool {

  public static void main(String[] args) {

    // Constructor argument is number of threads:

    ExecutorService exec = Executors.newFixedThreadPool(5);

    for(int i = 0; i < 5; i++)

      exec.execute(new LiftOff());

    exec.shutdown();

  }

}

/*

#3(9), #4(9), #0(9), #2(9), #1(9), #3(8), #4(8), #0(8), #2(8), #1(8), #3(7), #4(7), #0(7), #2(7), #3(6), #4(6), #0(6), #2(6), #3(5), #4(5), #0(5), #2(5), #3(4), #4(4), #0(4), #2(4), #3(3), #4(3), #0(3), #2(3), #3(2), #4(2), #0(2), #2(2), #3(1), #4(1), #0(1), #2(1), #3(Liftoff!), #4(Liftoff!), #0(Liftoff!), #2(Liftoff!), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!),

*/

 

Executors. newSingleThreadExecutor (int nThreads):確保任意時刻在任何都只有唯一的任務在執行,你不需要在共享資源上處理同步,可以讓你省去只是為了維持某些事物的原型而進行的各種協調努力。

public class SingleThreadExecutor {

  public static void main(String[] args) {

    ExecutorService exec =

      Executors.newSingleThreadExecutor();

    for(int i = 0; i < 5; i++)

      exec.execute(new LiftOff());

    exec.shutdown();

  }

}

/*

#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),

*/

 

 

 

 

Thread.sleep(100);等同於TimeUnit.MILLISECONDS.sleep(100);

 

Daemom執行緒:後面執行緒不屬於程式中不可缺少的部分,因此,當所有非後臺執行緒結束時,程式也就終止了,同時會殺死程序中的所有後臺執行緒。返過來說,只要有任何非後臺執行緒還在執行,程式就不會終止。

 

必須線上程啟動之前呼叫setDaemon()方法,才能把它設定為後臺執行緒。

 

如果一個執行緒是後臺執行緒,那麼它建立的任何執行緒將被自動設定成後臺執行緒。

 

後臺執行緒在不執行finally子句的情況下就會終止其run()方法,即後臺執行緒的finally子句不一定執行。

 

在構造器中啟動執行緒可能會有問題,因為執行緒可能會在構造器結束之前開始執行,這意味著該執行緒能夠訪問處於不穩定狀態的物件。

 

異常不能跨執行緒傳播給main(),所以你必須在了本地處理所有線上程內部產生的異常。

public class ExceptionThread implements Runnable {

  public void run() {

    throw new RuntimeException();

  }

  public static void main(String[] args) {

        try {

          ExecutorService exec =

            Executors.newCachedThreadPool();

          exec.execute(new ExceptionThread());

        } catch(RuntimeException ue) {

          // 這句將不會被執行,因為執行緒的異常是不會傳遞到呼叫它的執行緒的

          System.out.println("Exception has been handled!");

        }

      }

}

 

Thread.UncaughtExceptionHandler是Java SE5中的新介面,它允許你在每個Thread物件上都附著一個異常處理器。Thread.UncaughtExceptionHandler.uncaughtException()會在因未捕獲的異常而臨近死亡時被呼叫。為了使用它,我們建立了一個新型別的ThreadFactory,它將在每個新建立的Thread物件上附著一個Thread.UncaughtExceptionHandler,並將這個工廠傳遞給Exceutors建立新的ExcecutorService方法:

// 執行緒

class ExceptionThread2 implements Runnable {

  public void run() {

    Thread t = Thread.currentThread();

    System.out.println("run() by " + t);

    System.out.println("1.eh = " + t.getUncaughtExceptionHandler());

    throw new RuntimeException();//執行緒執行時一定會丟擲執行異常

  }

}

 

// 執行緒異常處理器

class MyUncaughtExceptionHandler implements

Thread.UncaughtExceptionHandler {

  // 異常處理方法

  public void uncaughtException(Thread t, Throwable e) {

    System.out.println("caught " + e);

  }

}

 

// 執行緒工廠,建立執行緒時會呼叫該工廠

class HandlerThreadFactory implements ThreadFactory {

  public Thread newThread(Runnable r) {//執行緒建立工廠方法

    System.out.println(this + " creating new Thread");

    Thread t = new Thread(r);

    System.out.println("created " + t);

    //設定異常處理器

    t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());

    System.out.println("2.eh = " + t.getUncaughtExceptionHandler());

    return t;

  }

}

 

public class CaptureUncaughtException {

  public static void main(String[] args) {

    ExecutorService exec = Executors.newCachedThreadPool(

      new HandlerThreadFactory());

    exec.execute(new ExceptionThread2());

  }

} /*

[email protected] creating new Thread

created Thread[Thread-0,5,main]

2.eh = [email protected]

run() by Thread[Thread-0,5,main]

1.eh = [email protected]

caught java.lang.RuntimeException

*/

 

 

如果你知道將要在程式碼中處處使用相同的異常處理器,那麼更簡單的方式是在Thread類中設定一個靜態域,並將這個處理器設定為預設的未捕獲異常處理器:Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());

 

>>>使用Lock物件<<<

Lock物件必須被顯式地建立、鎖定和釋放。因此,它與內建的鎖形式相比,程式碼缺乏優雅性,但對於解決某些型別的問題來說,它更加靈活。

  private Lock lock = new ReentrantLock();

  public int next() {

    lock.lock();

    try {

      //…

    } finally {

      lock.unlock();

    }

  }

使用lock()和unlock()方法在next()內部建立了臨界資源。還可以嘗試獲取鎖:

  private ReentrantLock lock = new ReentrantLock();

  public void untimed() {

    boolean captured = lock.tryLock();

    try {

      //…

    } finally {

      if(captured)

        lock.unlock();

    }

  }

 

>>>使用volatile物件<<<

原子操作是不能被執行緒除錯機制中斷的操作,一旦開始操作,那麼它一定會在切換到其他執行緒前執行完畢。

 

原子操作可以應用於除long和double之外的所有基本型別之上的“簡單操作”,對於讀取和寫入除long和double之外的基本型別變數這種的操作,可以保證它們會被當作原子操作來操作記憶體。但是JVM可以將64位(long和double變數)的讀取和寫入當作兩個分離的32位操作來執行,這就可能會產生了在一個讀取和寫入操作中間切換執行緒,從而導致不同的執行緒看到不正確結果的可能性。但是,當你定義long或double變數時,如果使用volatile關鍵字,就會獲得(簡單的賦值與返回操作)原子性,注:在Java SE5之前,volatile一直未能正確的工作。

 

volatile關鍵字還確保了應用中的可視性,如果你將一個域宣告為volatile的,那麼只要對這個域產生了寫操作,那麼所有的讀操作就都可以看到這個修改,即便使用了本地快取,情況確實如此,volatile域會立即被寫入到主存中。

 

在非volatile域上的操作沒有重新整理到主存中去,因此其他讀取該域的執行緒將不能必看到這個新值。因此,如果多個執行緒同時訪問了某個域,那麼這個域就應該是volatile的,否則,這個域應該只能由同步來訪問,同步也會導致向主存中重新整理,因此如果一個域完全由synchronized方法或語句塊來保護,那就不必將其設定為volatile了。

 

什麼才屬於原子操作呢?對域中的值做賦值和返回操作都是原子性的。但i++; i+=2; 這樣的操作肯定不是原子性的,即執行緒有可能從語句的中間切換。下面來證明i++在java裡不是原子性操作的:

class SerialNumberGenerator {

       private static volatile int serialNumber = 0;

 

       public static /* synchronized */int nextSerialNumber() {

              // 不是執行緒安全,因為i++在Java裡不是原子操作,

              // 即使將serialNumber設定成了volatile

              return serialNumber++;

       }

}

 

class CircularSet {

       private int[] array;

       private int len;

       private int index = 0;

 

       public CircularSet(int size) {

              array = new int[size];

              len = size;

              // 初始化為-1

              for (int i = 0; i < size; i++) {

                     array[i] = -1;

              }

       }

 

       public synchronized void add(int i) {

              array[index] = i;

              // 如果陣列滿後從頭開始填充,好比迴圈陣列:

              index = ++index % len;

       }

 

       public synchronized boolean contains(int val) {

              for (int i = 0; i < len; i++) {

                     if (array[i] == val) {

                            return true;

                     }

              }

              return false;

       }

}

 

public class SerialNumberChecker {

       private static final int SIZE = 10;

       private static CircularSet serials = new CircularSet(1000);

       private static ExecutorService exec = Executors.newCachedThreadPool();

 

       static class SerialChecker implements Runnable {

              public void run() {

                     while (true) {

                            int serial = SerialNumberGenerator.nextSerialNumber();

                            if (serials.contains(serial)) {// 如果陣列中存在則退出

                                   System.out.println("Duplicate: " + serial);

                                   System.exit(0);

                            }

                            serials.add(serial);// 如果不存在,則放入

                     }

              }

       }

 

       public static void main(String[] args) throws Exception {

              SerialChecker sc = new SerialChecker();

              // 啟動10執行緒

              for (int i = 0; i < SIZE; i++) {

                     exec.execute(sc);

              }

       }

}

 

 

 

 

public class Increament extends Thread {

       public static volatile int x = 0;

 

       public void run() {

//            synchronized (Increament.class) {

                     // x++與 x = x + 1都不是原子操作

                     x++;

                     // x = x + 1;

//            }

       }

 

       public static void main(String[] args) throws Exception {

             

              Thread threads[] = new Thread[10000];

              for (int i = 0; i < threads.length; i++) {

                     threads[i] = new Increament();

              }

 

              for (int i = 0; i < threads.length; i++) {

                     threads[i].start();

              }

              for (int i = 0; i < threads.length; i++) {

                     // 等待計算執行緒執行完

                     threads[i].join();

              }

              System.out.println("n=" + Increament.x);

       }

}

如果對x的操作是原子級別的,最後輸出的結果應該為x=10000,而在執行上面積程式碼時,很多時侯輸出的x都小於10000,這說明x++ 不是原子級別的操作。原因是宣告為volatile的簡單變數如果當前值由該變數以前的值相關,那麼volatile關鍵字不起作用。

 

 

同一時刻只有一個執行緒能訪問synchronized塊,synchronized塊並不是一下子要執行完畢,CPU除錯可能從synchronized塊中的某個語句切換到其它的執行緒,再其它執行緒執行完畢後再繼續執行該同步塊。切換到其他執行緒時是否釋放synchronized塊上的鎖,這要看切換所採用的方式:如果是CPU自動或呼叫Thread.yeild切換,則不會釋放;如果是呼叫wait,則會釋放;如果是呼叫的Thread.sleep,則不會;如果是呼叫的thread.join,則要看synchronized塊上的鎖是否是thread執行緒物件,如果不是,則不會釋放,如果是,則會釋放。

 

只能在同步控制方法或同步控制塊裡呼叫wait()、notify()和notifyAll(),並且釋放操作鎖,但sleep()可以在非同步控制方法裡呼叫,不會釋放鎖。

 

sleep、yield都是Thread的靜態方法,join屬於Thread的非靜態方式,如果將它們放入在同步塊中呼叫時都不會釋放鎖。但wait屬於Object類的方法,在wait()期間物件鎖是釋放的。

 

在執行同步程式碼塊的過程中,遇到異常而導致執行緒終止,鎖會釋放。

 

執行執行緒的suspend()方法會導致執行緒被暫停,並使用resume()可喚醒,但不會釋放鎖。

 

當執行緒在執行中執行了Thread類的yield()靜態方法,如果此時具有相同優先順序的其他執行緒處於就緒狀態,那麼yield()方法將把當前執行的執行緒放到可執行池中並使用中另一執行緒執行。如果沒有相同優先順序的可執行程序,則該方法什麼也不做。

 

sleep方法與yield方法都是Thread類的靜態方法,都會使當前處於執行的執行緒放棄CPU,把執行機會讓給另的執行緒。兩都的區別:

1.         sleep方法會給其他執行緒執行的機會以,而不考慮其他執行緒的優先順序,因此會給較低優先順序執行緒一個執行的機會;yield方法只會給相同或更高優先順序的執行緒一個執行的機會。

2.         當執行緒執行了sleep方法後,將轉到阻塞狀態。當執行緒執行了yield方法後,將轉入就緒狀態。

3.         Sleep方法比yield方法具有更好的可移植性。不能依靠yield方法來提高程式的併發效能。對於大多數程式設計師來說,yield方法的唯一用途是在測試期間人為地提高程式的併發效能,以幫助發現一些隱藏的錯誤。

 

thread.join():當前執行緒呼叫另一執行緒thread.join()時,則當前執行的執行緒將轉到阻塞狀態,並且等待thread執行緒執行結束後,當前執行緒程才會恢復執行(從阻塞狀態到就緒狀態)。比如有3個執行緒在執行計算任務,必須等三個執行緒都執行完才能彙總,那麼這時候在主執行緒裡面讓三個執行緒join,最後計算結果既可:

public class JoinTest {

       public static void main(String[] args) {

              Rt[] ct = new Rt[3];

              for (int i = 0; i < ct.length; i++) {

                     ct[i] = new Rt();

                     ct[i].start();

                     try {

                            //主線等待三個執行緒終止後再繼續執行

                            ct[i].join();

                     } catch (InterruptedException e) {

                            e.printStackTrace();

                     }

              }

              int total = 0;

              for (int j = 0; j < ct.length; j++) {

                     total += ct[j].getResult();

              }

              System.out.println("total = " + total);

       }

}

 

class Rt extends Thread {

       private int result;

       public int getResult() {

              return result;

       }

       public void run() {

              try {

                     Thread.sleep(1000);

                     result = (int) (Math.random() * 100);

                     System.out.println(this.getName() + " result=" + result);

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

 

       }

}

 

join()只能由執行緒例項呼叫,如果thread.join()在同步塊中呼叫,並且同步鎖物件也是thread物件,由於thread.join()是呼叫thread.wait()來實現的,wait會釋放thread物件鎖,則thread.join()與在同步塊的鎖也會一併釋放;如果thread.join()在同步塊的鎖物件不是thread物件,則thread執行緒阻塞時不會釋放鎖:

public class JoinTest {

       public static void main(String[] args) throws InterruptedException {

              JThread t = new JThread();

              start(t, t);

              System.out.println("--------");

              t = new JThread();

              start(t, JThread.class);

       }

 

       static void start(JThread t, Object lock) {

              t.setLock(lock);

              t.start();

              try {

                     Thread.sleep(100);

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

              //如果鎖物件是JThread.class時,則主執行緒會一直阻塞

              t.f();

       }

}

 

class JThread extends Thread {

       private Object lock;

 

       void setLock(Object lock) {

              this.lock = lock;

       }

 

       public void run() {

              synchronized (lock) {

                     try {

                            System.out.println(Thread.currentThread().getName() + " - join before");

                            /*

                             * 當前執行緒阻塞,又要等待自己執行完,這是矛盾的,所以其實該執行緒永遠不會恢復執

                             * 行,除非使用 join(long millis)方式。實際上我們看this.join()原始碼就會

                             * 看出,this.join()就是呼叫了this.wait()方法,因為了this.wait()會釋放

                             * this物件上的鎖,所以當lock物件是自身時,主執行緒不會被鎖住,所以第一個執行緒

                             * 會列印 "main - f()"。第二個執行緒的鎖物件是JThread的Class物件,由於join

                             * 時不會釋放JThread.class物件上的鎖, 第二個執行緒會一直阻塞,所以第二個執行緒

                             * 不會列印 "main - f()",

                             *

                             */

                            this.join();

                            /*

                             * 這樣可以正常結束整個程式,因為this執行緒一直會阻塞直到對方(也是this的執行緒)執行完

                             * 或者是對方沒有執行完等 1 毫秒後thsi執行緒繼續執行,所以以這樣的方式一定不會出現死鎖

                             * 現象

                             */

                            //this.join(1);

                            System.out.println(Thread.currentThread().getName() + " - join after");

                     } catch (InterruptedException e) {

                            e.printStackTrace();

                     }

              }

       }

 

       public void f() {

              synchronized (lock) {

                     System.out.println(Thread.currentThread().getName() + " - f()");

              }

       }

}

 

sleep與join都會使當前執行緒處於阻塞狀態,而yield則是進行就緒狀態。

 

同步的靜態方法的鎖物件是方法所屬類的Class物件,而同步的非靜態方法的鎖物件是所呼叫方法例項所對應的this物件。

 

繼承Runnable與Thread的區別:Thread類本身也實現了Runnable介面。 因此除了構造Runnable物件並把它作為建構函式的引數傳遞給Thread類之外,你也可以生成Thread類的一個子類,通過覆蓋這個run方法來執行相應的操作。不過,通常最好的策略是把Runnable介面當作一個單獨的類來實現,並把它作為引數傳遞給個Thread的建構函式。通過將程式碼隔離在單獨的類中可以使你不必擔心Runnable類中使用的同步方法和同步塊與在相應執行緒類中所使用的其他任何方法之間的潛在操行所帶來的影響。更一般地說,這種分離允許獨立控制相關的操作和執行這些操作的上下文,同一個Runnable物件既可以傳遞給多個使用不同方式初擡化的Thread物件,也可以傳遞給其他的輕量級執行者(executor)。同樣需要注意的是,繼承了Thread類的劉象不能再同時繼承其他類了。

 

如果執行緒被啟動並且沒有終止,呼叫方法isAlive將返回true。如果執行緒僅僅是因為某個原因阻塞,該方法也會返回true。

 

通過呼叫執行緒t的join方法將呼叫者掛起,直到目標執行緒t結束執行:t.join方法會在當t.isAlive方法的結果為false時返回。

 

有一些Thread類的方法只能應用於當前正在執行的那個執行緒中(也就是,呼叫Thread靜態方法的執行緒),為了強制實施,這些方法都被宣告為static:Thread.currentThread、Thread.interrupted、Thread.sleep、Thread.yield。

 

Thread.yield:僅僅是一個建議——放棄當前執行緒去執行其他的執行緒,JVM可以使用自己的方式理解這個建議。儘管缺少保證,但yield方法仍舊可以在一些單CPU的JVM實現上起到相應的效果,只要這些實現不使用分時搶佔式的呼叫機制,在這種機制下,只有當一個執行緒阻塞時,CPU才會切換到其他執行緒上執行。如果在系統中執行緒執行了耗時的非阻塞計算任務的會佔有更多的CPU時間,因而降低了應用程式的響應,為了安全起見,當執行非阻塞的計算任務的方法時,則可以在執行過程中插入yield方法(甚至是sleep方法)。為了減少不必要的影響,可以只在偶爾的情況下呼叫yield方法,比如一個包含如下語句的迴圈:

if(Math.random() < 0.01) Thread.yield();

使用搶佔式排程機制的JVM實現,特別是在多處理器的情況下,yield才可能顯得沒有什麼意義。