1. 程式人生 > >漫談並發編程(五):線程之間的協作

漫談並發編程(五):線程之間的協作

版本 處理 clas ext edr locking throws try-catch []

編寫多線程程序須要進行線程協作。前面介紹的利用相互排斥來防止線程競速是來解決線程協作的衍生危害的。編寫線程協作程序的關鍵是解決線程之間的協調問題,在這些任務中,某些能夠並行運行,可是某些步驟須要全部的任務都結束之後才幹開動。

wait()與notifyAll()

wait()使你能夠等待某個條件發生變化,wait()會在等待外部世界產生變化的時候將任務掛起,而且僅僅有在notify()或notifyAll()發生時,即表示發生了某些感興趣的事物,這個任務才會被喚醒並去檢查所產生的變化。

調用sleep()的時候鎖並沒有被釋放,調用yield()也屬於這樣的情況。理解這一點非常關鍵。還有一方面。當一個任務在方法裏遇到了對wait()的調用的時候。線程的運行被掛起,對象上的鎖被釋放。

因此wait()釋放鎖。這就意味著還有一個任務能夠獲得這個鎖。因此在該對象中的其它synchronized方法能夠在wait()期間被調用,而其它的方法通常將會產生改變,而這樣的改變正是使被掛起的任務又一次喚醒所感興趣的變化。

有兩種形式的wait()。第一種版本號接受毫秒數為參數,含義與sleep()方法裏的參數的意思同樣,都是指"在此期間暫停"。可是與sleep()不同的是,對於wait()而言:
  1. 在wait()期間對象鎖是釋放的
  2. 能夠通過notify()、notifyAll(),或者令時間到期。從wait()中恢復運行。
另外一種,也是更常見形式的wait()不接收不論什麽參數。

這樣的wait()將無限等待下去,直到線程接收到notify()或者notifyAll()消息。

能夠想象,wait()、notify()、notifyAll()一定是基於某個"東西",把自身狀態附加上去,來實現這樣的通知及狀態的變化。

考慮設計方式:1. 這樣的東西能夠單獨被定義出來。 2. 在Object中提供該"東西"的實現。 明顯另外一種方式要輕松方便很多。遷移性更強。其次。這樣的東西可能不是線程安全的,所以須要鎖來支持。

使用synchronized來進行同步的保護是理所應當,由於"東西"的實現就在Object中,其次使用synchronized的優點是一定程度能夠避免由於鎖不一致的情況下產生的wait()及notifyAll的不正確應。wait()在一把鎖中釋放了鎖,和notifyAll在還有一把鎖進行操作毫無相關。

java要求僅僅能在同步控制方法或同步控制塊裏調用wait()、notify()和notifyAll()。 以下演示一個樣例。一個是將蠟塗到Car上,一個是拋光它。拋光任務在塗蠟任務完畢之前,是不能運行其工作的。而塗蠟任務在塗還有一層蠟之前,必須等待拋光任務完畢。WaxOn和WaxOff都使用了Car對象,該對象在這些任務等待條件變化時候,使用wait()和notifyAll()來掛起和又一次啟動這些任務:
class Car {
     private boolean waxOn = false;
     public synchronized void waxed() {
          waxOn = true;
          notifyAll( );
     }
     public synchronized void buffed( ) {
          waxOn = false;
          notifyAll( );
     }
     public synchronized void waitForWaxing( )  throws InterruptedException{
          while(waxOn == false)
               wait( );
     }     
     public synchronized void waitForBuffing( ) throws InterruptedException {
          while(waxOn == true)
               wait( );
     }
}

class WaxOn implements Runnable {
     private Car car;
     public WaxOn(Car c) { car = c;}
     public void run() {
          try {
               while(!Thread.interrupted()) {
                    System.out.print(" Wax on!");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.waxed();
                    car.waitForBuffing();
               }
          } catch (InterruptedException e) {
               System.out.println("Exiting via interrupt");
          }
          System.out.println("Ending Wax On task");
     }
}

class WaxOff implements Runnable {
     private Car car;
     public WaxOff(Car c) {car = c;}
     public void run( ) {
          try {
               while(!Thread.interrupted()) {
                    car.waitForWaxing();
                    System.out.print("Wax Off");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.buffed();
               }
          } catch(InterruptedException e) {
               System.out.println("Exiting via interrupt");
          }
          System.out.println("Ending Wax Off task");
     }
}

public class WaxOMatic {
     public static void main(String[] args) throws Exception{
          Car car = new Car();
          ExecutorService exec = Executors.newCachedThreadPool();
          exec.execute(new WaxOff(car));
          exec.execute(new WaxOn(car));
          TimeUnit.SECONDS.sleep(5);
          exec.shutdownNow();
     }
}
前面的演示樣例強調你必須用一個檢查感興趣的條件的while循環包圍wait()。

這非常重要,由於:

  • 你可能有多個任務出於同樣的原因在等待一個鎖,而第一個喚醒任務可能已經改變這樣的狀況(即使你沒有這麽做,有人也會通過繼承你的類去這麽做)。假設屬於這樣的情況,那麽這個任務應該被再次掛起,直至其感興趣的條件發生變化。

  • 也有可能某些任務處於不同的原因在等待你的對象上鎖(在這樣的情況下必須使用(notifyAll))。在這樣的情況下,你須要檢查是否已經由正確的原因喚醒,假設不是,就再次調用wait()。

notify()與notifyAll() 由於在技術上,可能會有多個任務在單個Car對象上處於wait()狀態。因此調用notifyAll()比調用notify()要更安全。可是,上面程序的結構僅僅會有一個任務處於wait()狀態,因此你能夠使用notify()來取代notifyAll()。 使用notify()而不是notifyAll()是一種優化。

使用notify()時。在眾多等待同一個鎖的任務中僅僅有一個會被喚醒,因此假設你希望使用notify()就必須保證被喚醒的是恰當的任務。

另外,為了使用notify()。全部任務必須等待同樣的條件,由於假設你有多個任務在等待不同的條件。那麽你就不會知道是否喚醒的恰當的任務。假設使用notify(),當條件發生變化時,必須僅僅有一個任務能從中受益。最後,這些限制對全部可能存在的子類都必須總是起作用的。

假設這些規則中有不論什麽一條不滿足。那麽你就必須使用notifyAll()而不是notify()。


用wait()和notifyAll()實現生產者消費者問題

使用wait()和notifyAll()時一定要註意不能兩層嵌套synchronized,假設使用了兩層,則外層的sycnhronized加的鎖無法釋放。

並且須要註意的是不能使用Lock來限制資源的訪問。由於wait時無法釋放該鎖。假設還要限制在notifyAll時不能notifyAll到同類。那麽實現這個問題還是有難度的。

以下貼上一個自己一個粗陋的實現。各位朋友有美麗代碼的也能夠貼上來交流下。
class Meal {
}

class WaitPerson implements Runnable {
     private String name;
     private Restaurant restaurant;

     public WaitPerson(String name, Restaurant res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    synchronized (restaurant.waitPersons) {
                         while (restaurant.meals.size() < 1) {
                              restaurant.waitPersons.wait();
                         }
                    }
                    synchronized (restaurant.chefs) {
                         if (restaurant.meals.size() >= 1) {
                              restaurant.meals.poll();
                              restaurant.chefs.notifyAll();
                              System.out.println(name + " consumed a meal !");
                         }
                    }
               }
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

class Chef implements Runnable {
     private String name;
     private Restaurant restaurant;

     public Chef(String name, Restaurant res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    synchronized (restaurant.chefs) {
                         while (restaurant.meals.size() > 10) {
                         restaurant.chefs.wait();
                    }
               }
                    synchronized (restaurant.waitPersons) {
                         if (restaurant.meals.size() <= 10) {
                              restaurant.meals.add(new Meal());
                              restaurant.waitPersons.notifyAll();
                              System.out.println(name + " produced a meal !");
                         }
                    }
               }
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

public class Restaurant {
     public Queue<Meal> meals = new ConcurrentLinkedQueue<Meal>();
     public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>();
     public List<Chef> chefs = new ArrayList<Chef>();

     public static void main(String[] args) throws InterruptedException {
          Restaurant res = new Restaurant();
          ExecutorService exec = Executors.newCachedThreadPool();
          Chef chef1 = new Chef("chef1", res);
          Chef chef2 = new Chef("chef2", res);
          res.chefs.add(chef1);
          res.chefs.add(chef2);
          exec.execute(chef1);
          exec.execute(chef2);
          WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res);
          WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res);
          res.waitPersons.add(waitPerson1);
          res.waitPersons.add(waitPerson2);
          exec.execute(waitPerson1);
          exec.execute(waitPerson2);
          // TimeUnit.MILLISECONDS.sleep(3000);
          // exec.shutdownNow();
     }
}
上面這個程序能夠證明出來是線程安全的。

只是使用這樣的方式實在是太晦澀了。生產者消費者問題的機制須要我們去控制,實際上,java並發類庫為我們提供了這樣的模型的實現,我們待會會用堵塞隊列來重寫這個問題。


使用顯式的Lock和Condition對象

我們能夠顯式的使用Condition對象來替代我前面提到的"東西",使用這樣的方式將更加靈活,且有更清晰的辯識度,但會添加程序中對象的數量。你能夠通過在Condition上調用await()來掛起一個任務。當外部條件發生變化。意味著某個任務應該繼續運行時。你能夠通過調用signal()來通知這個任務,從而喚醒一個任務,或者調用signalAll()來喚醒全部在這個Condition上被其自身掛起的任務。 以下我們利用此工具重寫前面樣例中的Car類。

class Car {
     private boolean waxOn = false;
     private Lock lock = new ReentrantLock();
     private Condition condition = lock.newCondition();
     public  void waxed() {
          lock.lock();
          try {
               waxOn = true;
               condition.signalAll();
          } finally {
               lock.unlock();
          }
     }
     public void buffed( ) {
          lock.lock();
          try {
               waxOn = false;
               condition.signalAll();
          } finally {
               lock.unlock();
          }
     }
     public void waitForWaxing( )  throws InterruptedException{
          lock.lock();
          try{
               while(waxOn == false)
               condition.await();
          } finally {
               lock.unlock();
          }
     }
     public  void waitForBuffing( ) throws InterruptedException {
          lock.lock();
          try {
               while(waxOn == true)
               condition.await( );
          } finally {
               lock.unlock();
          }
     }
}

使用BlockingQueue來解決生產者消費者問題

java幫我們抽象了生產者消費者問題。我們能夠使用同步隊列來解決任務協作的問題。同步隊列在不論什麽時刻都僅僅同意一個任務插入或移除元素。

在java.util.concurrent.BlockingQueue接口中提供了這個隊列,這個接口有大量的實現。你通常能夠使用LinkedBlockingQueue。它是一個無界隊列,還能夠使用ArrayBlockingQueue,它具有固定的尺寸,因此你能夠在它被堵塞之前,向當中放置有限數量的元素。

假設消費者任務試圖從隊列中獲取對象,而該隊列此時為空,那麽這些隊列還能夠掛起消費者任務,而且當有很多其它的元素可用時恢復消費者任務。堵塞隊列能夠解決很大量的問題。而其方式與wait()和notifyAll()相比,則簡單並可靠太多。 以下利用堵塞隊列實現了上面的餐廳問題。

class Meal {
}

class WaitPerson implements Runnable {
     private String name;
     private RestaurantBlookingQueue restaurant;
     public WaitPerson(String name, RestaurantBlookingQueue res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    restaurant.meals.take();
                    System.out.println(name + "taked a Meal");
                    Thread.sleep(100);
               }
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

class Chef implements Runnable {
     private String name;
     private RestaurantBlookingQueue restaurant;

     public Chef(String name, RestaurantBlookingQueue res) {
          this.name = name;
          this.restaurant = res;
     }

     @Override
     public void run() {
          try {
               while (!Thread.interrupted()) {
                    restaurant.meals.put(new Meal());
                    System.out.println(this.name + "made a meal");
                    Thread.sleep(100);
               } 
          } catch (InterruptedException e) {
               System.out.println(name + " is ended via InterruptedException !");
               return;
          }
          System.out.println(name + " is ended via InterruptedException !");
     }
}

public class RestaurantBlookingQueue {
     public BlockingQueue<Meal> meals = new ArrayBlockingQueue<Meal>(10);
     public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>();
     public List<Chef> chefs = new ArrayList<Chef>();

     public static void main(String[] args) throws InterruptedException {
          RestaurantBlookingQueue res = new RestaurantBlookingQueue();
          ExecutorService exec = Executors.newCachedThreadPool();
          Chef chef1 = new Chef("chef1", res);
          Chef chef2 = new Chef("chef2", res);
          res.chefs.add(chef1);
          res.chefs.add(chef2);
          exec.execute(chef1);
          exec.execute(chef2);
          WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res);
          WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res);
          res.waitPersons.add(waitPerson1);
          res.waitPersons.add(waitPerson2);
          exec.execute(waitPerson1);
          exec.execute(waitPerson2);

     // TimeUnit.MILLISECONDS.sleep(3000);
     // exec.shutdownNow();
     }
}

任務間使用管道進行輸入/輸出

通過輸入/輸出在線程間進行通信通常非常實用。

提供線程功能的類庫以"管道"的形式對線程的輸入/輸出提供了支持。

它們在Java輸入/輸出類庫中的相應物就是PipedWriter類(同意任務向管道寫)和PipedReader類(同意不同任務從同一個管道讀)。這個模型能夠看成是"生產者-消費者"問題的變體。

管道基本是一個堵塞隊列,存在於多個引入BlookingQueue之前的Java版本號。

class Sender implements Runnable {
     private Random rand = new Random(47);
     private PipedWriter out = new PipedWriter();
     public PipedWriter getPipedWriter( ) {return out;}
     public void run( ) {
          try {
               while(true) {
                    for(char c = ‘A‘ ; c <= ‘z‘; c++) {
                         out.write(c);
                         TimeUnit.MILLISECONDS.sleep( rand.nextInt(500));
                    }
               }
          } catch (IOException e) {
               System.out.println(e + " Sender write exception");
          } catch (InterruptedException e) {
               System.out.println(e + " Sender sleep exception");
          }
     }
}

class Receiver implements Runnable {
     private PipedReader in;
     public Receiver(Sender sender) throws IOException {
          in = new PipedReader(sender.getPipedWriter());
     }
     public void run( ) {
          try {
               while(true) {
                    System.out.print("Read: "+(char)in.read() + ", ");
               }
          } catch (IOException e) {
               System.out.println(e + " Receiver read exception");
          }
     }
}

public class PipedIO {
     public static void main(String []args) throws Exception {
          Sender sender = new Sender( );
          Receiver receiver = new Receiver( sender );
          ExecutorService exec = Executors.newCachedThreadPool();
          exec.execute(sender);
          exec.execute(receiver);
          TimeUnit.SECONDS.sleep( 4 );
          exec.shutdownNow();
     }
}

死鎖

死鎖本是操作系統的中概念,由於操作系統中會遇到非常多可能發生死鎖的狀況。但我們在並發程序常常也須要預防死鎖。特別是多個線程在並發的訪問多個對象的時候。首先,我們須要從邏輯上避免死鎖發生的可能性,比如哲學家進餐問題。一般在程序中的解決方案是一次性將資源全然分配給它,為了提供並發度,須要我們進一步縮小並發鎖的範圍。

除了邏輯上預防並發。我們還須要處理意外情況,比如獲取到資源的線程中途掛掉。我們須要釋放資源。在程序中即釋放鎖。在程序中能夠通過try-catch實現。


漫談並發編程(五):線程之間的協作