1. 程式人生 > >35-多執行緒--多執行緒JDK1.5新特性(Lock+Condition)+使用JDK1.5新特性解決多生產者多消費者問題+總結+範例(Lock+Condition-多生產者多消費者問題實際開發程式碼)

35-多執行緒--多執行緒JDK1.5新特性(Lock+Condition)+使用JDK1.5新特性解決多生產者多消費者問題+總結+範例(Lock+Condition-多生產者多消費者問題實際開發程式碼)

java.util.concurrent.locks包中提供了幾個介面:Lock、Condition......

一、java.util.concurrent.locks.Lock

1、interface Lock:Lock實現提供了比使用synchronized方法(同步函式)和語句(同步程式碼塊)可獲得的更廣泛的鎖定操作

2、JDK1.5以後,將同步和鎖封裝成了物件,並將操作鎖的隱式方法定義到了該物件中,將隱式動作變成了顯式動作

注:Lock出現的目的:替代synchronized,包括:同步程式碼塊和同步函式。因為同步程式碼塊和同步函式只是一個封裝體,封裝體中自帶鎖(對鎖的操作是隱式的:獲取和釋放鎖都是底層自動完成的)。而Lock是一個鎖物件,具備著操作鎖的方法(對鎖的操作是顯式的:獲取和釋放鎖都是通過呼叫方法完成的)

3、Lock介面的實現允許鎖在不同的作用範圍內獲取和釋放,並允許以任何順序獲取和釋放多個鎖

(synchronized方法或語句的使用提供了對與每個物件相關的隱式監視器鎖的訪問,但卻強制所有鎖獲取和釋放均要出現在一個塊結構中:當獲取了多個鎖時,它們必須以相反的順序釋放,且必須在與所有鎖被獲取時相同的詞法範圍內釋放所有鎖)

4、鎖定和取消鎖定出現在不同作用範圍中時,必須謹慎地確保保持鎖定時所執行的所有程式碼用 try/finally 或 try/catch 加以保護,以確保在必要時釋放鎖

(使用結構塊鎖,就具備使用synchronized方法和語句時會出現的鎖自動釋放功能)

5、方法

(1)void lock():獲取鎖。如果鎖不可用,出於執行緒排程目的,將禁用當前執行緒,並且在獲得鎖之前,該執行緒將一直處於休眠狀態

(2)void unlock():釋放鎖。釋放鎖的動作一定要執行,所以,unlock()方法應該放在finally程式碼塊中

(3)Condition newCondition():返回繫結到此Lock例項的新Condition例項。在等待條件前,鎖必須由當前執行緒保持。呼叫Condition.await()將在等待前以原子方式釋放鎖,並在等待返回前重新獲取鎖

注:Condition例項的具體操作依賴於Lock實現,並且該實現必須對此加以記錄

(4)boolean tryLock():僅在呼叫時鎖為空閒狀態才獲取該鎖

(5)boolean tryLock(long time, TimeUnit unit) throws InterruptedException:如果鎖在給定的等待時間內空閒,並且當前執行緒未被中斷,則獲取鎖

(6)void lockInterruptibly() throws InterruptedException:如果當前執行緒未被中斷,則獲取鎖

6、程式碼演變

    //最開始的同步程式碼。Object obj是鎖
    Object obj = new Object();

    public void show() {
        synchronized (obj) {
            //code...
        }
    }
    //JDK1.5後,將同步和鎖封裝成了鎖物件Lock
    //ReentrantLock:一個可重入的互斥鎖Lock,是Lock介面的子類
    Lock lock = new ReentrantLock();

    //用Lock替代synchronized,所以,不用synchronized做同步了
    public void show() {
        //獲取鎖
        lock.lock();

        //code...

        //釋放鎖
        lock.unlock();
    }

問題:獲取鎖後,在執行code程式碼時發生異常,程式跳轉,釋放鎖的程式碼執行不到,導致其他執行緒都獲取不到鎖

解決方法:釋放鎖的動作一定要執行。將釋放鎖的 lock.unlock() 方法放在finally程式碼塊中

    //JDK1.5後,將同步和鎖封裝成了鎖物件Lock
    //ReentrantLock:一個可重入的互斥鎖Lock,是Lock介面的子類
    Lock lock = new ReentrantLock();

    //用Lock替代synchronized,所以,不用synchronized做同步了
    public void show() {
        //獲取鎖
        lock.lock();
        try {
            //將可能會發生異常的程式碼放在try程式碼塊中
            //code...

        } finally {
            //異常是否被處理catch不一定
            //但一定要將鎖釋放,寫在finally中
            //釋放鎖
            lock.unlock();
        }
    }

7、用Lock改寫多生產者多消費者問題的程式碼

/**
 * 此段程式碼有問題
 */

//導包
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Resource {
    private String name;
    private int count = 1;
    private boolean flag = false;

    //建立一個鎖物件
    Lock lock = new ReentrantLock();

    //用Lock替代synchronized,所以,不用synchronized做同步了
//    public synchronized void set(String name) {
    public void set(String name) {

        //獲取鎖
        lock.lock();

        try {
            //使用while替換if
            while (flag) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            this.name = name + count;
            count++;
            System.out.println(Thread.currentThread().getName() + "...生產者..." + this.name);

            this.flag = true;
            //使用notifyAll()全喚醒
            this.notifyAll();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    //用Lock替代synchronized,所以,不用synchronized做同步了
//    public synchronized void out() {
    public void out() {

        //獲取鎖
        lock.lock();

        try {
            //使用while替換if
            while (!flag) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName() + "......消費者......" + this.name);

            this.flag = false;
            //使用notifyAll()全喚醒
            this.notifyAll();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }
}

二、java.util.concurrent.locks.Condition

1、之前同步中的鎖是自定義的this、obj等,鎖操作鎖上的執行緒this.wait()等。現在的鎖是自己建立的,沒有this鎖,不能使用this.wait()了,要使用 new Lock() 鎖上的方法

2、interface Condition:將Object監視器方法(wait()、notify()、notifyAll())分解成截然不同的物件,以便通過將這些物件與任意Lock實現組合使用,為每個物件提供多個等待set(wait-set)。其中,Lock替代了synchronized方法和語句的使用,Condition替代了Object監視器方法的使用

3、之前的同步程式碼,一個鎖上只能有一組監視器方法wait()、notify()、notifyAll()。因為每個鎖都是Object的子類,只能拿到一組方法。而Condition將Object中的方法進行了單獨的封裝,封裝成一個Condition物件。一個Lock鎖上可以掛多個Condition,每個Condition都有一組監視器

(可以將監視器方法封裝成Condition物件,一個Condition物件中封裝一組監視器方法。有多個Condition物件,它們都可以屬於同一個鎖Lock,這些Condition的方法都可以作用到Lock鎖的執行緒上)

4、獲取Condition

(1)Condition newCondition():返回繫結到此Lock例項的新Condition例項(返回新的監視器方法例項)

注:Condition例項實質上被繫結到一個鎖上(因為監視器必須得監視鎖上的執行緒)。要為特定Lock例項獲得Condition例項,請使用其newCondition()方法

    //建立一個鎖物件
    Lock lock = new ReentrantLock();
    
    //通過已有的鎖,獲取該鎖上的監視器物件
    //一個Lock鎖上可以有多個Condition監視器物件
    Condition con1 = lock.newCondition();
    Condition con2 = lock.newCondition();

5、方法

(1)void await() throws InterruptedException:造成當前執行緒在接到訊號或被中斷之前一直處於等待狀態

(2)boolean await(long time, TimeUnit unit) throws InterruptedException:造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態。此方法在行為上等效於:awaitNanos(unit.toNanos(time)) > 0

(3)long awaitNanos(long nanosTimeout) throws InterruptedException:造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態

(4)void awaitUninterruptibly():造成當前執行緒在接到訊號之前一直處於等待狀態

(5)boolean awaitUntil(Date deadline) throws InterruptedException:造成當前執行緒在接到訊號、被中斷或到達指定最後期限之前一直處於等待狀態

(6)void signal():喚醒一個等待執行緒。如果所有的執行緒都在等待此條件,則選擇其中的一個喚醒。在從await()返回之前,該執行緒必須重新獲取鎖

(7)void signalAll():喚醒所有等待執行緒。如果所有的執行緒都在等待此條件,則喚醒所有執行緒。在從await()返回之前,每個執行緒都必須重新獲取鎖

注:Object中的監視器方法和Condition中的監視器方法之間的對應關係

(1)wait() --> await()

(2)notify() --> signal()

(3)notifyAll() --> signalAll()

6、一個Lock鎖上可以有多組Condition監視器(監視器所屬於鎖),每個Condition監視器上都有await()、signal()、signalAll()方法。所以,一個Lock鎖上可以有多組Condition監視器方法

三、使用JDK1.5新特性解決多生產者多消費者問題

1、以前一個鎖上只有一組監視器,這組監視器既監視著生產者,又監視著消費者。意味著這組監視器能將生產者和消費者全都wait()或喚醒(notify()、notifyAll())。而現線上程有了分類,一組負責生產,一組負責消費。希望生產者/消費者可以喚醒消費者/生產者。使用兩個監視器,一組監視生產者,一組監視消費者(四個執行緒共用同一把鎖,但是監視器不同)

2、鎖的替換

(1)Lock <-- 同步synchronized

(2)Condition <-- Object中的監視器方法

3、監視器方法替換

(1)await() <-- wait()

(2)signal() <-- notifyAll()

注:生產者執行緒和消費者執行緒共用一個鎖Lock,但可以根據Lock鎖獲取兩組Condition監視器物件,一組監視生產者執行緒,一組監視消費者執行緒。這樣,await()和signal()時,可以分別指定操作哪組執行緒。使用signal()替代notifyAll(),而不用signalAll()

//導包
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Resource {
    private String name;
    private int count = 1;
    private boolean flag = false;

    //建立一個鎖物件
    //ReentrantLock:是Lock介面的一個子類
    Lock lock = new ReentrantLock();

    //通過已有的鎖,獲取該鎖上的兩組監視器物件。一組監視生產者,一組監視消費者
    //一個Lock鎖上可以有多個Condition監視器物件
    Condition producer_condition = lock.newCondition();
    Condition consumer_condition = lock.newCondition();

    //有Lock就不使用synchronized
    public void set(String name) {
        //獲取鎖
        lock.lock();

        try {
            //使用while替換if
            while (flag) {
                try {
                    //生產者等待
                    producer_condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            this.name = name + count;
            count++;
            System.out.println(Thread.currentThread().getName() + "...生產者 5.0..." + this.name);

            this.flag = true;
            //喚醒消費者
            //因為喚醒的是消費者執行緒中的內容,所以,使用signal()喚醒任意一個即可
            //不用全喚醒,全喚醒也只能有一個執行
            consumer_condition.signal();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    //有Lock就不使用synchronized
    public void out() {
        //獲取鎖
        lock.lock();

        try {
            //使用while替換if
            while (!flag) {
                try {
                    //消費者等待
                    consumer_condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName() + "......消費者 5.0......" + this.name);

            this.flag = false;
            //喚醒生產者
            //因為喚醒的是生產者執行緒中的內容,所以,使用signal()喚醒任意一個即可
            //不用全喚醒,全喚醒也只能有一個執行
            producer_condition.signal();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }
}

class Producer implements Runnable {
    private Resource r;

    Producer(Resource r) {
        this.r = r;
    }

    @Override
    public void run() {
        while (true) {
            r.set("烤鴨");
        }
    }
}

class Consumer implements Runnable {
    private Resource r;

    Consumer(Resource r) {
        this.r = r;
    }

    @Override
    public void run() {
        while (true) {
            r.out();
        }
    }
}

public class Test {
    public static void main(String[] args) {
        Resource r = new Resource();

        Producer producer = new Producer(r);
        Consumer consumer = new Consumer(r);

        Thread t0 = new Thread(producer);
        Thread t1 = new Thread(producer);
        Thread t2 = new Thread(consumer);
        Thread t3 = new Thread(consumer);

        t0.start();
        t1.start();
        t2.start();
        t3.start();
    }
}

四、總結

1、Lock介面:Lock介面的出現替代了同步程式碼塊或同步函式,將同步的隱式鎖操作變成了顯式鎖操作。同時更為靈活,可以一個鎖上加多組監視器

(1)lock():獲取鎖

(2)unlock():釋放鎖。通常需要定義在finally程式碼塊中

2、Condition介面:Condition介面的出現替代了Object中的wait()、notify()、notifyAll()方法,將這些監視器方法單獨進行了封裝,變成了Condition監視器物件,可以和任意的鎖進行組合

(1)await():wait()

(2)signal():notify()

(3)signalAll():notifyAll()

五、範例

1、Lock

        Lock l = ...;
        l.lock();
        try {
            // access the resource protected by this lock
        } finally {
            l.unlock();
        }

2、Condition -- 多生產者多消費者問題實際開發程式碼

class BoundedBuffer {
    //鎖
    final Lock lock = new ReentrantLock();
    //監視器
    //非滿
    final Condition notFull = lock.newCondition();
    //非空
    final Condition notEmpty = lock.newCondition();

    //陣列容器,存的是物件
    final Object[] items = new Object[100];
    //用來運算元組的變數:存,取,計數器
    //有陣列就得有指標,存和取的指標得分別定義putptr、takeptr,同時還得記錄陣列中元素的個數
    int putptr, takeptr, count;

    //await()方法丟擲異常,因為方法內沒有做catch處理,想讓呼叫者處理。也可以在方法內處理
    public void put(Object x) throws InterruptedException {
        //獲取鎖。此時,take()不能取,因為用的是同一個鎖,互斥
        lock.lock();
        try {
            //判斷標記時一定要用while。因為每次醒來都先判斷標記,安全
            //while是必須的
            while (count == items.length) {
                //存滿了,生產者等待
                notFull.await();
            }
            //生產一個存一個
            items[putptr] = x;
            if (++putptr == items.length) {
                //生產到最後一個後,繼續從0開始存
                putptr = 0;
            }
            ++count;
            //signal()、signalAll()用哪個不一定。如果signal()能實現喚醒對方,就不需要signalAll()
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            //判斷標記時一定要用while。因為每次醒來都先判斷標記,安全
            //while是必須的
            while (count == 0) {
                //取完了,消費者等待
                notEmpty.await();
            }
            //從預設的0角標開始取
            Object x = items[takeptr];
            if (++takeptr == items.length) {
                //取到最後一個後,繼續從0開始取
                takeptr = 0;
            }
            --count;
            //signal()、signalAll()用哪個不一定。如果signal()能實現喚醒對方,就不需要signalAll()
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}