1. 程式人生 > >Java實現生產者消費者問題與讀者寫者問題

Java實現生產者消費者問題與讀者寫者問題

1、生產者消費者問題

    生產者消費者問題是研究多執行緒程式時繞不開的經典問題之一,它描述是有一塊緩衝區作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。解決生產者/消費者問題的方法可分為兩類:(1)採用某種機制保護生產者和消費者之間的同步;(2)在生產者和消費者之間建立一個管道。第一種方式有較高的效率,並且易於實現,程式碼的可控制性較好,屬於常用的模式。第二種管道緩衝區不易控制,被傳輸資料物件不易於封裝等,實用性不強。

    同步問題核心在於:如何保證同一資源被多個執行緒併發訪問時的完整性。常用的同步方法是採用訊號或加鎖機制,保證資源在任意時刻至多被一個執行緒訪問。Java語言在多執行緒程式設計上實現了完全物件化,提供了對同步機制的良好支援。在Java中一共有五種方法支援同步,其中前四個是同步方法,一個是管道方法。

  • wait() / notify()方法
  • await() / signal()方法
  • BlockingQueue阻塞佇列方法
  • Semaphore方法
  • PipedInputStream / PipedOutputStream

1.1 wait() / notify()方法

wait() / nofity()方法是基類Object的兩個方法,也就意味著所有Java類都會擁有這兩個方法,這樣,我們就可以為任何物件實現同步機制。

wait()方法:當緩衝區已滿/空時,生產者/消費者執行緒停止自己的執行,放棄鎖,使自己處於等等狀態,讓其他執行緒執行。

notify()方法:當生產者/消費者向緩衝區放入/取出一個產品時,向其他等待的執行緒發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

各起了4個生產者,4個消費者 
package test;

public class Hosee {
    private static Integer count = 0;
    private final Integer  FULL  = 10;
    private static String  LOCK  = "LOCK";

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                
try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count == FULL) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); LOCK.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count); LOCK.notifyAll(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }

(需要注意的是,用什麼加鎖就用什麼notify和wait,例項中使用的是LOCK)

部分列印結果:

由於生產者和消費者說明一致,所以最多都是在2左右,當減少一個消費者時,則會加到10。

1.2 await() / signal()方法

首先,我們先來看看await()/signal()與wait()/notify()的區別:

  1. wait()和notify()必須在synchronized的程式碼塊中使用 因為只有在獲取當前物件的鎖時才能進行這兩個操作 否則會報異常 而await()和signal()一般與Lock()配合使用。
  2. wait是Object的方法,而await只有部分類有,如Condition。
  3. await()/signal()和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。

那麼為什麼有了synchronized還要提出Lock呢?

1.2.1 對synchronized的改進

    synchronized並不完美,它有一些功能性的限制 —— 它無法中斷一個正在等候獲得鎖的執行緒,也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖。同步還要求鎖的釋放只能在與獲得鎖所在的堆疊幀相同的堆疊幀中進行,多數情況下,這沒問題(而且與異常處理互動得很好),但是,確實存在一些非塊結構的鎖定更合適的情況。

1.2.2 ReentrantLock 類

java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,它允許把鎖定的實現作為 Java 類,而不是作為語言的特性來實現(更加面向物件)。這就為 Lock 的多種實現留下了空間,各種實現可能有不同的排程演算法、效能特性或者鎖定語義。 ReentrantLock 類實現了 Lock ,它擁有與 synchronized 相同的併發性和記憶體語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用情況下更佳的效能。(換句話說,當許多執行緒都想訪問共享資源時,JVM 可以花更少的時候來排程執行緒,把更多時間用在執行執行緒上。)

reentrant 鎖意味著什麼呢?簡單來說,它有一個與鎖相關的獲取計數器,如果擁有鎖的某個執行緒再次得到鎖,那麼獲取計數器就加1,然後鎖需要被釋放兩次才能獲得真正釋放(重入鎖)。這模仿了 synchronized 的語義;如果執行緒進入由執行緒已經擁有的監控器保護的 synchronized 塊,就允許執行緒繼續進行,當執行緒退出第二個(或者後續) synchronized 塊的時候,不釋放鎖,只有執行緒退出它進入的監控器保護的第一個synchronized 塊時,才釋放鎖。

簡單解釋下重入鎖:

public class Child extends Father implements Runnable{
    final static Child child = new Child();//為了保證鎖唯一
    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            new Thread(child).start();
        }
    }

    public synchronized void doSomething() {
        System.out.println("1child.doSomething()");
        doAnotherThing(); // 呼叫自己類中其他的synchronized方法
    }

    private synchronized void doAnotherThing() {
        super.doSomething(); // 呼叫父類的synchronized方法
        System.out.println("3child.doAnotherThing()");
    }

    @Override
    public void run() {
        child.doSomething();
    }
}
class Father {
    public synchronized void doSomething() {
        System.out.println("2father.doSomething()");
    }
}
上述程式碼的鎖都是child物件,當執行child.doSomething時,該執行緒獲得child物件的鎖,在doSomething方法內執行doAnotherThing時再次請求child物件的鎖,因為synchronized是重入鎖,所以可以得到該鎖,繼續在doAnotherThing裡執行父類的doSomething方法時第三次請求child物件的鎖,同理可得到,如果不是重入鎖的話,那這後面這兩次請求鎖將會被一直阻塞,從而導致死鎖。

在檢視下面程式碼示例時,可以看到 Lock 和 synchronized 有一點明顯的區別 —— lock 必須在 finally 塊中釋放。否則,如果受保護的程式碼將丟擲異常,鎖就有可能永遠得不到釋放!這一點區別看起來可能沒什麼,但是實際上,它極為重要。忘記在 finally 塊中釋放鎖,可能會在程式中留下一個定時炸彈,當有一天炸彈爆炸時,您要花費很大力氣才有找到源頭在哪。而使用同步,JVM 將確保鎖會獲得自動釋放。

Lock lock = new ReentrantLock();
lock.lock();
try { 
  // update object state
}
finally {
  lock.unlock(); 
}

除此之外,與目前的 synchronized 實現相比,爭用下的 ReentrantLock 實現更具可伸縮性。(在未來的 JVM 版本中,synchronized 的爭用效能很有可能會獲得提高。)這意味著當許多執行緒都在爭用同一個鎖時,使用 ReentrantLock 的總體開支通常要比 synchronized 少得多。

1.2.3 什麼時候選擇用 ReentrantLock 代替 synchronized

在 Java1.5 中,synchronized 是效能低效的。因為這是一個重量級操作,需要呼叫操作介面,導致有可能加鎖消耗的系統時間比加鎖以外的操作還多。相比之下使用 Java 提供的 Lock 物件,效能更高一些。但是到了 Java1.6,發生了變化。synchronized 在語義上很清晰,可以進行很多優化,有適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。導致在 Java1.6 上 synchronized 的效能並不比 Lock 差。官方也表示,他們也更支援 synchronized,在未來的版本中還有優化餘地。

所以在確實需要一些 synchronized 所沒有的特性的時候,比如時間鎖等候、可中斷鎖等候、無塊結構鎖、多個條件變數或者鎖投票使用ReentrantLock。ReentrantLock 還具有可伸縮性的好處,應當在高度爭用的情況下使用它,但是請記住,大多數 synchronized 塊幾乎從來沒有出現過爭用,所以可以把高度爭用放在一邊。我建議用 synchronized 開發,直到確實證明 synchronized 不合適,而不要僅僅是假設如果使用 ReentrantLock “效能會更好”。請記住,這些是供高階使用者使用的高階工具。(而且,真正的高階使用者喜歡選擇能夠找到的最簡單工具,直到他們認為簡單的工具不適用為止。)。一如既往,首先要把事情做好,然後再考慮是不是有必要做得更快。

1.2.4 接下來我們使用ReentrantLock來實現生產者消費者問題

package test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Hosee {
    private static Integer count = 0;
    private final Integer FULL = 10;
    final Lock lock = new ReentrantLock();
    final Condition NotFull = lock.newCondition();
    final Condition NotEmpty = lock.newCondition();

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                lock.lock();
                try {
                    while (count == FULL) {
                        try {
                            NotFull.await();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生產者生產,目前總共有" + count);
                    NotEmpty.signal();
                } finally {
                    lock.unlock();
                }

            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                lock.lock();
                try {
                    while (count == 0) {
                        try {
                            NotEmpty.await();
                        } catch (Exception e) {
                            // TODO: handle exception
                            e.printStackTrace();
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消費者消費,目前總共有" + count);
                    NotFull.signal();
                } finally {
                    lock.unlock();
                }

            }

        }

    }

    public static void main(String[] args) throws Exception {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();

        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}
執行結果與第一個類似。上述程式碼用了兩個Condition,其實用一個也是可以的,只不過要signalall()。

1.3 BlockingQueue阻塞佇列方法

    BlockingQueue是JDK5.0的新增內容,它是一個已經在內部實現了同步的佇列,實現方式採用的是我們第2種await() / signal()方法。它可以在生成物件時指定容量大小。它用於阻塞操作的是put()和take()方法。

put()方法:類似於我們上面的生產者執行緒,容量達到最大時,自動阻塞。

take()方法:類似於我們上面的消費者執行緒,容量為0時,自動阻塞。

package test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Hosee {
    private static Integer count = 0;
    final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10);
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    bq.put(1);
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生產者生產,目前總共有" + count);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    bq.take();
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消費者消費,目前總共有" + count);
                } catch (Exception e) {
                    // TODO: handle exception
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();

        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}
其實這個BlockingQueue比較難用程式碼來演示,因為put()與take()方法無法與輸出語句保證同步,當然你可以自己去實現 BlockingQueue(BlockingQueue是用await()/signal() 實現的)。所以在輸出結果上你會發現不匹配。

例如:當緩衝區已滿,生產者在put()操作時,put()內部呼叫了await()方法,放棄了執行緒的執行,然後消費者執行緒執行,呼叫take()方法,take()內部呼叫了signal()方法,通知生產者執行緒可以執行,致使在消費者的println()還沒執行的情況下生產者的println()先被執行,所以有了輸出不匹配的情況。

對於BlockingQueue大家可以放心使用,這可不是它的問題,只是在它和別的物件之間的同步有問題。

1.4 Semaphore方法

Semaphore 訊號量,就是一個允許實現設定好的令牌。也許有1個,也許有10個或更多。  
誰拿到令牌(acquire)就可以去執行了,如果沒有令牌則需要等待。  
執行完畢,一定要歸還(release)令牌,否則令牌會被很快用光,別的執行緒就無法獲得令牌而執行下去了。
package test;

import java.util.concurrent.Semaphore;

public class Hosee
{
    int count = 0;
    final Semaphore notFull = new Semaphore(10);
    final Semaphore notEmpty = new Semaphore(0);
    final Semaphore mutex = new Semaphore(1);

    class Producer implements Runnable
    {
        @Override
        public void run()
        {
            for (int i = 0; i < 10; i++)
            {
                try
                {
                    Thread.sleep(3000);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
                try
                {
                    notFull.acquire();//順序不能顛倒,否則會造成死鎖。
                    mutex.acquire();
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生產者生產,目前總共有" + count);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    mutex.release();
                    notEmpty.release();
                }

            }
        }
    }

    class Consumer implements Runnable
    {

        @Override
        public void run()
        {
            for (int i = 0; i < 10; i++)
            {
                try
                {
                    Thread.sleep(3000);
                }
                catch (InterruptedException e1)
                {
                    e1.printStackTrace();
                }
                try
                {
                    notEmpty.acquire();//順序不能顛倒,否則會造成死鎖。
                    mutex.acquire();
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消費者消費,目前總共有" + count);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    mutex.release();
                    notFull.release();
                }

            }

        }

    }

    public static void main(String[] args) throws Exception
    {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();

        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}
注意notFull.acquire()與mutex.acquire()的位置不能互換,如果先得到互斥鎖再發生等待,會造成死鎖。

1.5 PipedInputStream / PipedOutputStream

這個類位於java.io包中,是解決同步問題的最簡單的辦法,一個執行緒將資料寫入管道,另一個執行緒從管道讀取資料,這樣便構成了一種生產者/消費者的緩衝區程式設計模式。PipedInputStream/PipedOutputStream只能用於多執行緒模式,用於單執行緒下可能會引發死鎖。

package test;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Hosee {
    final PipedInputStream pis = new PipedInputStream();
    final PipedOutputStream pos = new PipedOutputStream();
    {
        try {
            pis.connect(pos);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            try{
                while(true){
                    int b = (int) (Math.random() * 255);
                    System.out.println("Producer: a byte, the value is " + b);
                    pos.write(b);
                    pos.flush();
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                try{
                    pos.close();
                    pis.close();
                }catch(IOException e){
                    System.out.println(e);
                }
            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            try{
                while(true){
                    int b = pis.read();
                    System.out.println("Consumer: a byte, the value is " + String.valueOf(b));
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                try{
                    pos.close();
                    pis.close();
                }catch(IOException e){
                    System.out.println(e);
                }
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}

與阻塞佇列一樣,由於read()/write()方法與輸出方法不一定同步,輸出結果方面會發生不匹配現象,為了使結果更加明顯,這裡只有1個消費者和1個生產者。

2、讀者寫者問題

讀者—寫者問題(Readers-Writers problem)也是一個經典的併發程式設計問題,是經常出現的一種同步問題。計算機系統中的資料(檔案、記錄)常被多個程序共享,但其中某些程序可能只要求讀資料(稱為讀者Reader);另一些程序則要求修改資料(稱為寫者Writer)。就共享資料而言,Reader和Writer是兩組併發程序共享一組資料區,要求:

(1)允許多個讀者同時執行讀操作;

(2)不允許讀者、寫者同時操作;

(3)不允許多個寫者同時操作。

Reader和Writer的同步問題分為讀者優先、弱寫者優先(公平競爭)和強寫者優先三種情況,它們的處理方式不同。

首先我們都只考慮公平競爭的情況下,看看Java有哪些方法可以實現讀者寫者問題

2.1 讀寫鎖

ReentrantReadWriteLock會使用兩把鎖來解決問題,一個讀鎖,一個寫鎖
執行緒進入讀鎖的前提條件:
    沒有其他執行緒的寫鎖,
    沒有寫請求或者有寫請求,但呼叫執行緒和持有鎖的執行緒是同一個
執行緒進入寫鎖的前提條件:
    沒有其他執行緒的讀鎖
    沒有其他執行緒的寫鎖

到ReentrantReadWriteLock,首先要做的是與ReentrantLock劃清界限。它和後者都是單獨的實現,彼此之間沒有繼承或實現的關係。然後就是總結這個鎖機制的特性了: 

  1. 重入(在上文ReentrantLock處已經介紹了)方面其內部的WriteLock可以獲取ReadLock,但是反過來ReadLock想要獲得WriteLock則永遠都不要想。 
  2. WriteLock可以降級為ReadLock,順序是:先獲得WriteLock再獲得ReadLock,然後釋放WriteLock,這時候執行緒將保持Readlock的持有。反過來ReadLock想要升級為WriteLock則不可能,為什麼?參看(1),呵呵. 
  3. ReadLock可以被多個執行緒持有並且在作用時排斥任何的WriteLock,而WriteLock則是完全的互斥。這一特性最為重要,因為對於高讀取頻率而相對較低寫入的資料結構,使用此類鎖同步機制則可以提高併發量。 
  4. 不管是ReadLock還是WriteLock都支援Interrupt,語義與ReentrantLock一致。 
  5. WriteLock支援Condition並且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則丟擲UnsupportedOperationException異常。 

看下ReentrantReadWriteLock這個類的兩個建構函式

public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

fair這個引數表示是否是建立一個公平的讀寫鎖,還是非公平的讀寫鎖。也就是搶佔式還是非搶佔式。

公平和非公平:公平表示獲取的鎖的順序是按照執行緒加鎖的順序來分配獲取到鎖的執行緒時最先加鎖的執行緒,是按照FIFO的順序來分配鎖的;非公平表示獲取鎖的順序是無需的,後來加鎖的執行緒可能先獲得鎖,這種情況就導致某些執行緒可能一直沒獲取到鎖。

公平鎖為啥會影響效能,從code上來看看公平鎖僅僅是多了一項檢查是否在隊首會影響效能,如不是,那麼又是在什麼地方影響的?假如是闖入的執行緒,會排在隊尾並睡覺(parking)等待前任節點喚醒,這樣勢必會比非公平鎖新增很多paking和unparking的操作

一般的應用場景是: 如果有多個讀執行緒,一個寫執行緒,而且寫執行緒在操作的時候需要阻塞讀執行緒,那麼此時就需要使用公平鎖,要不然可能寫執行緒一直獲取不到鎖,導致執行緒餓死。

再簡單說下鎖降級

重入還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。
rwl.readLock().lock();
      if (!cacheValid) {
         // Must release read lock before acquiring write lock
         rwl.readLock().unlock();
         rwl.writeLock().lock();
        
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
       
         rwl.readLock().lock();
         rwl.writeLock().unlock(); // 降級:先獲取讀鎖再釋放寫鎖
      }

下面我們用讀寫鎖來實現讀者寫者問題

import java.util.Random;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
    public static void main(String[] args) {
        final Queue3 q3 = new Queue3();
        for (int i = 0; i < 3; i++) {
            new Thread() {
                public void run() {
                    while (true) {
                        q3.get();
                    }
                }
            }.start();
        }
        for (int i = 0; i < 3; i++) {
            new Thread() {
                public void run() {
                    while (true) {
                        q3.put(new Random().nextInt(10000));
                    }
                }
            }.start();
        }
    }
}

class Queue3 {
    private Object data = null;// 共享資料,只能有一個執行緒能寫該資料,但可以有多個執行緒同時讀該資料。
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    public void get() {
        rwl.readLock().lock();// 上讀鎖,其他執行緒只能讀不能寫
        System.out.println(Thread.currentThread().getName()
                + " be ready to read data!");
        try {
            Thread.sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()
                + "have read data :" + data);
        rwl.readLock().unlock(); // 釋放讀鎖,最好放在finnaly裡面
    }

    public void put(Object data) {
        rwl.writeLock().lock();// 上寫鎖,不允許其他執行緒讀也不允許寫
        System.out.println(Thread.currentThread().getName()
                + " be ready to write data!");
        try {
            Thread.sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.data = data;
        System.out.println(Thread.currentThread().getName()
                + " have write data: " + data);
        rwl.writeLock().unlock();// 釋放寫鎖
    }
}

執行結果:

Thread-0 be ready to read data!
Thread-1 be ready to read data!
Thread-2 be ready to read data!
Thread-0have read data :null
Thread-2have read data :null
Thread-1have read data :null
Thread-5 be ready to write data!
Thread-5 have write data: 6934
Thread-5 be ready to write data!
Thread-5 have write data: 8987
Thread-5 be ready to write data!
Thread-5 have write data: 8496

2.2 Semaphore訊號量

在1.4中已經介紹了用訊號量來實現生產者消費者問題,現在我們將用訊號量來實現讀者寫者問題,訊號量的相關知識不再重複,直接看程式碼

package test;

import java.util.Random;
import java.util.concurrent.Semaphore;

public class ReadWrite
{
    public static void main(String[] args)
    {
        final Queue3 q3 = new Queue3();
        for (int i = 0; i < 3; i++)
        {
            new Thread()
            {
                public void run()
                {
                    while (true)
                    {
                        try
                        {
                            Thread.sleep((long) (Math.random() * 1000));
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        q3.get();
                    }
                }
            }.start();
            
        }
        for (int i = 0; i < 3; i++)
        {
            new Thread()
            {
                public void run()
                {
                    while (true)
                    {
                        try
                        {
                            Thread.sleep((long) (Math.random() * 1000));
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        q3.put(new Random().nextInt(10000));
                    }
                }
            }.start();
        }
    }
}

class Queue3
{
    private Object data = null;// 共享資料,只能有一個執行緒能寫該資料,但可以有多個執行緒同時讀該資料。
    private Semaphore wmutex = new Semaphore(1);
    private Semaphore rmutex = new Semaphore(2);
    private int count = 0;

    public void get()
    {
        try
        {
            rmutex.acquire();
            if (count == 0)
                wmutex.acquire();// 當第一讀程序欲讀資料庫時,阻止寫程序寫
            count++;
            System.out.println(Thread.currentThread().getName()
                    + " be ready to read data!");
            try
            {
                Thread.sleep((long) (Math.random() * 1000));
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()
                    + "have read data :" + data);
            count--;
            if (count == 0)
                wmutex.release();
            rmutex.release();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    public void put(Object data)
    {
        try
        {
            wmutex.acquire();
            System.out.println(Thread.currentThread().getName()
                    + " be ready to write data!");
            try
            {
                Thread.sleep((long) (Math.random() * 1000));
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            this.data = data;
            System.out.println(Thread.currentThread().getName()
                    + " have write data: " + data);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            wmutex.release();
        }
    }
}
單純使用訊號量不能解決讀者與寫者問題,必須引入計數器count(可以用CountDownLatch代替 )對讀程序計數; count與wmutex結合使用,使讀讀能同時進行,讀寫排斥。count為0時表示讀程序開始,此時寫程序阻塞(wmutex被讀程序獲取),當count不為0時,表示有多個讀程序,就不用操作 wmutex了,因為第一個讀程序已經獲得了 wmutex。count表示有多少個讀程序在讀,每次有一個就+1,讀完了-1,當count==0時,表示讀程序都結束了。此時 wmutex釋放,寫程序才有機會獲得wmutex。為了使讀程序不要一直佔有 wmutex,最好讓讀程序sleep一下,讓寫程序有機會獲得wmutex,使效果更明顯。

總結:

就此用Java實現生產者消費者問題(5種)和讀者寫者問題(2種)已經闡述完了,歡迎大家討論以及給出不同的解決方案