1. 程式人生 > >多線程編程-- part 5.2 JUC鎖之Condition條件

多線程編程-- part 5.2 JUC鎖之Condition條件

兩個 test extend 釋放 get timeout tin rac main

1.Condition介紹

  Condition的作用是對鎖進行更精確的控制。Condition中的await()方法相當於Object的wait()方法,Condition中的signal()方法相當於Object的notify()方法,Condition中的signalAll()相當於Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關鍵字)捆綁使用的;而Condition是需要與"互斥鎖"/"共享鎖"捆綁使用的。

2.Condition函數列表

// 造成當前線程在接到信號或被中斷之前一直處於等待狀態。
void await()
// 造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。
boolean await(long time, TimeUnit unit)
// 造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。
long awaitNanos(long nanosTimeout)
// 造成當前線程在接到信號之前一直處於等待狀態。
void awaitUninterruptibly()
// 造成當前線程在接到信號、被中斷或到達指定最後期限之前一直處於等待狀態。
boolean awaitUntil(Date deadline)
// 喚醒一個等待線程。 void signal() // 喚醒所有等待線程。 void signalAll()

3.Condition示例

(1)通過Object的wait(),notify()來演示線程休眠/喚醒

(2)是通過Condition的await(), signal()來演示線程的休眠/喚醒功能。
(3)是通過Condition的高級功能。

示例1:

public class testHello {

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        
synchronized(ta) { // 通過synchronized(ta)獲取“對象ta的同步鎖” try { System.out.println(Thread.currentThread().getName()+" start ta"); ta.start(); System.out.println(Thread.currentThread().getName()+" block"); ta.wait(); // 等待 System.out.println(Thread.currentThread().getName()+" continue"); } catch (InterruptedException e) { e.printStackTrace(); } } } static class ThreadA extends Thread{ public ThreadA(String name) { super(name); } public void run() { synchronized (this) { // 通過synchronized(this)獲取“當前對象的同步鎖” System.out.println(Thread.currentThread().getName()+" wakup others"); notify(); // 喚醒“當前對象上的等待線程” } } } }

技術分享

示例2:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest1 {
        
    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        lock.lock(); // 獲取鎖
        try {
            System.out.println(Thread.currentThread().getName()+" start ta");
            ta.start();

            System.out.println(Thread.currentThread().getName()+" block");
            condition.await();    // 等待

            System.out.println(Thread.currentThread().getName()+" continue");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();    // 釋放鎖
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            lock.lock();    // 獲取鎖
            try {
                System.out.println(Thread.currentThread().getName()+" wakup others");
                condition.signal();    // 喚醒“condition所在鎖上的其它線程”
            } finally {
                lock.unlock();    // 釋放鎖
            }
        }
    }
}

技術分享

Condition:

  Condition除了支持上面的功能之外,它更強大的地方在於:能夠更加精細的控制多線程的休眠與喚醒。對於同一個鎖,我們可以創建多個Condition,在不同的情況下使用不同的Condition。
例如,假如多線程讀/寫同一個緩沖區:當向緩沖區中寫入數據之後,喚醒"讀線程";當從緩沖區讀出數據之後,喚醒"寫線程";並且當緩沖區滿的時候,"寫線程"需要等待;當緩沖區為空時,"讀線程"需要等待。 如果采用Object類中的wait(), notify(), notifyAll()實現該緩沖區,當向緩沖區寫入數據之後需要喚醒"讀線程"時,不可能通過notify()或notifyAll()明確的指定喚醒"讀線程",而只能通過notifyAll喚醒所有線程(但是notifyAll無法區分喚醒的線程是讀線程,還是寫線程)。 但是,通過Condition,就能明確的指定喚醒讀線程。

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //獲取鎖
        try {
            // 如果“緩沖已滿”,則等待;直到“緩沖”不是滿的,才將x添加到緩沖中。
            while (count == items.length)
                notFull.await();
            // 將x添加到緩沖中
            items[putptr] = x;
            // 將“put統計數putptr+1”;如果“緩沖已滿”,則設putptr為0。
            if (++putptr == items.length) putptr = 0;
            // 將“緩沖”數量+1
            ++count;
            // 喚醒take線程,因為take線程通過notEmpty.await()等待
            notEmpty.signal();

            // 打印寫入的數據
            System.out.print(Thread.currentThread().getName() + " put  ");
            for(int i = 0; i < items.length; i++) {
                System.out.print(items[i] + "  ");
            }
            System.out.println("\n");
        } finally {
            lock.unlock();    // 釋放鎖
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //獲取鎖
        try {
            // 如果“緩沖為空”,則等待;直到“緩沖”不為空,才將x從緩沖中取出。
            while (count == 0)
                notEmpty.await();
            // 將x從緩沖中取出
            Object x = items[takeptr];
            items[takeptr] = null;
            // 將“take統計數takeptr+1”;如果“緩沖為空”,則設takeptr為0。
            if (++takeptr == items.length) takeptr = 0;
            // 將“緩沖”數量-1
            --count;
            // 喚醒put線程,因為put線程通過notFull.await()等待
            notFull.signal();

            // 打印取出的數據
            System.out.print(Thread.currentThread().getName() + " take ");
            for(int i = 0; i < items.length; i++) {
                System.out.print(items[i] + "  ");
            }
            System.out.println("\n");
            return x;
        } finally {
            lock.unlock();    // 釋放鎖
        }
    }
}

public class testHello {
    private static BoundedBuffer bb = new BoundedBuffer();

    static class PutThread extends Thread {
        private int num;
        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }
        public void run() {
            try {
                Thread.sleep(1);    // 線程休眠1ms
                bb.put(num);        // 向BoundedBuffer中寫入數據
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }
        public void run() {
            try {
                Thread.sleep(10);                    // 線程休眠1ms
                Integer num = (Integer)bb.take();    // 從BoundedBuffer中取出數據
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        // 啟動10個“寫線程”,向BoundedBuffer中不斷的寫數據(寫入0-9);
        // 啟動10個“讀線程”,從BoundedBuffer中不斷的讀數據。
        for (int i=0; i<10; i++) {
            new PutThread("p"+i, i).start();
            new TakeThread("t"+i).start();
        }
    }

}

技術分享

結果說明:

(1) BoundedBuffer 是容量為5的緩沖,緩沖中存儲的是Object對象,支持多線程的讀/寫緩沖。多個線程操作“一個BoundedBuffer對象”時,它們通過互斥鎖lock對緩沖區items進行互斥訪問;而且同一個BoundedBuffer對象下的全部線程共用“notFull”和“notEmpty”這兩個Condition。
notFull用於控制寫緩沖,notEmpty用於控制讀緩沖。當緩沖已滿的時候,調用put的線程會執行notFull.await()進行等待;當緩沖區不是滿的狀態時,就將對象添加到緩沖區並將緩沖區的容量count+1,最後,調用notEmpty.signal()緩沖notEmpty上的等待線程(調用notEmpty.await的線程)。 簡言之,notFull控制“緩沖區的寫入”,當往緩沖區寫入數據之後會喚醒notEmpty上的等待線程。
同理,notEmpty控制“緩沖區的讀取”,當讀取了緩沖區數據之後會喚醒notFull上的等待線程。
(2) 在ConditionTest2的main函數中,啟動10個“寫線程”,向BoundedBuffer中不斷的寫數據;同時,也啟動10個“讀線程”,從BoundedBuffer中不斷的讀數據。

多線程編程-- part 5.2 JUC鎖之Condition條件