1. 程式人生 > >五、JAVA多執行緒:執行緒間通訊(wait、notify、notifyAll、wait set、自定義鎖 BooleanLock )

五、JAVA多執行緒:執行緒間通訊(wait、notify、notifyAll、wait set、自定義鎖 BooleanLock )

       我們在開發多執行緒程式的時候,往往不會只存在一個獨立的執行緒,相反大多數情況下是需要多個執行緒之間進行協同工作的,如何在多個執行緒之間進行通訊,是本章學習的重點。另外,本章的最後部分將會分析synchronized關鍵字的缺陷,我們手動實現了一個顯式鎖(BooleanLock)可以解決synchronized所不具備的功能,其中也需要用到執行緒間通訊的知識。

 

同步阻塞與非同步非阻塞

 

非同步非阻塞訊息處理

單執行緒間通訊

 

初識wait和notify

程式碼樣例:

import java.util.LinkedList ;
import static java.lang.Thread.currentThread ;


public class EventQueue {
    private final int max ;

    static  class Event {
        int i ;

        public Event(int i){
            this.i = i ;
        }


    }

    private final LinkedList<Event> eventQueue = new LinkedList<>();

    private final static int DEFAULT_MAX_EVENT = 10 ;


    public EventQueue(){
        this(DEFAULT_MAX_EVENT);
    }

    public EventQueue (int max){
        this.max = max;
    }

    public void offer(Event event){
        synchronized (eventQueue){
          if(eventQueue.size() >= max ){
              try{
                  console("the queue is full ...");
                  eventQueue.wait();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }

          }

          console("the new event is submitted ");
          eventQueue.addLast(event);
          eventQueue.notify();

        }
    }

    public Event take(){
        synchronized (eventQueue){
            if(eventQueue.isEmpty()){

                try {
                    console("the queue is empty . ");
                    eventQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

            Event event = eventQueue.removeFirst();
            this.eventQueue.notify();
            console("the event " + event.i + "  is handled . ");

            return event ;
        }

    }


    private void console(String message){
        System.out.printf("%s:%s \n", currentThread().getName() , message);
    }
}

 

import java.util.concurrent.TimeUnit;

public class EventClient {
    public static void main(String[] args) {

        final EventQueue eventQueue = new EventQueue() ;
        new Thread(() -> {
            for (int i = 0 ; i < 100000; i++){

                eventQueue.offer(new EventQueue.Event(i));
            }


        },"Producer"  ).start();


        new Thread(() -> {
            for (;;){
                eventQueue.take();

                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }

        },"Consumer " ).start(); ;


    }


}

 

執行截圖:

 

wait和notify方法詳解

wait和notify方法並不是Thread特有的方法,而是Object中的方法。也就是說JDK中每一個類都擁有這兩個方法。

 

先看wait的三個過載方法:

public final void wait() throws InterruptedException
public final void wait(long timeout) throws InterruptedException
public final void wait(long timeout,  int nanos) throws InterruptedException

說明:

1.wait方法這三個過載方法都將呼叫wait(long timeout)方法 , wait()方法 等價於 wait(0), 0 代表永不超時。

2.Object的wait(long timeout)方法會導致當前執行緒進入阻塞,知道有其他執行緒呼叫了Object的nofify或者notifyAll方法才能將其喚醒,或者阻塞時間到達了timeout時間而自動喚醒。

3.wait方法必須擁有該物件的monitor,也就是waite方法必須在同步方法中使用。

4.當前執行緒執行了該物件的wai方法之後,將會放棄對該monitor的所有權,並進進入到與該物件關聯的wait set中,也就是說一旦執行緒執行了某個object的wait方法之後,他就會釋放對該物件的monitor的所有權, 其他執行緒也會有機會繼續爭搶該monitor的所有權。

 

notify解析

 

public final native void notify();

喚醒單個正在執行該物件的wait方法的執行緒

如果有某個執行緒由於執行該物件的wait方法而進入阻塞則會被喚醒,如果沒有則會忽略。

被喚醒的執行緒需要重新獲取對該物件所關聯monitor的lock才能繼續執行

關於wait和notify的注意事項

1.wait方法是可中斷方法,一旦呼叫了wait方法案進入了阻塞狀態, 其他執行緒可以使用interrupt方法將其打斷。

可中斷方法被打斷後會受到中斷異常InterruptedException,同時interrupt標識也會被擦除。

2.執行緒執行了某個物件的wait方法之後,會加入與之對應的wait set中, 每一個物件的monitor都會有一個與之關聯的wait set

3.當前程進入wait set之後, notify方法可以將其進行喚醒, 也就是從wait set中彈出,同時,中斷wait中的執行緒,也就將其喚醒。

4.必須在同步方法中使用wait和notify方法,因為執行wait和notify的前提條件是必須持有同步方法monitor的所有權。

 

wait和sleep

1.wait和sleep方法都可以使執行緒進入阻塞狀態。

2.wait和sleep方法都是可中斷方法,被中斷之後,都會受到中斷異常

3.wait是Object的方法, sleep是Thread中的特有方法

4.wait方法的執行必須要同步方法中進行,則sleep不需要

5.執行緒在同步方法中執行sleep方法是,並不釋放monitor鎖,而wait方法則會釋放

6.sleep方法短暫休眠後會主動退出阻塞,而wait方法(沒有指定wait時間)則需要被其他執行緒中斷之後才能退出阻塞

多執行緒間通訊

生產者和消費者

1.notifyAll方法

多執行緒間通訊需要用到Object的notifyAll方法,該方法與notify比較相似,都可以喚醒由於呼叫了wait方法而阻塞的執行緒,但是notify方法只能喚醒其中的一個執行緒,而notifyAll方法則可以同時喚醒全部的阻塞執行緒,同時被喚醒的執行緒仍然需要繼續爭搶monitor的鎖。

 

程式碼樣例:

package com.zl.step5.eventQueue2;

import java.util.LinkedList;

import static java.lang.Thread.currentThread;


public class EventQueue {
    private final int max ;

    static  class Event {


    }

    private final LinkedList<Event> eventQueue = new LinkedList<>();

    private final static int DEFAULT_MAX_EVENT = 10 ;


    public EventQueue(){
        this(DEFAULT_MAX_EVENT);
    }

    public EventQueue (int max){
        this.max = max;
    }

    public void offer(Event event){
        synchronized (eventQueue){
          while(eventQueue.size() >= max ){
              try{
                  console("the queue is full ...");
                  eventQueue.wait();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }

          }

          console("the new event is submitted :" +event);
          eventQueue.addLast(event);
          eventQueue.notifyAll();

        }
    }

    public Event take(){
        synchronized (eventQueue){
            while(eventQueue.isEmpty()){

                try {
                    console("the queue is empty . ");
                    eventQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

            Event event = eventQueue.removeFirst();
            this.eventQueue.notifyAll();
            console("the event " + event + "  is handled . ");

            return event ;
        }

    }


    private void console(String message){
        System.out.printf("%s:%s \n", currentThread().getName() , message);
    }
}

 

 

package com.zl.step5.eventQueue2;

import java.util.concurrent.TimeUnit;

public class EventClient {
    public static void main(String[] args) {

        final EventQueue eventQueue = new EventQueue() ;

        new Thread(() -> {
            for (;;){

                eventQueue.offer(new EventQueue.Event());
            }


        },"Producer-1"  ).start();


        new Thread(() -> {
            for (;;){

                eventQueue.offer(new EventQueue.Event());
            }


        },"Producer-2"  ).start();

        new Thread(() -> {
            for (;;){
                eventQueue.take();

                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }

        },"Consumer " ).start(); ;


    }


}

 

執行緒休息室wait set

執行緒呼叫了某個物件的wait方法之後,都會被加入該物件的monitor關聯的wait set中, 並且釋放monitor的所有權

nofify方法:

notifyAll方法

 

自定義顯示鎖BooleanLock

synchronized關鍵字的缺陷

synchronized關鍵字給提供了一種排他式的資料同步機制,某個執行緒在獲取monitor lock的時候可能會被阻塞,

有兩個明顯的缺陷:

1.無法控制阻塞時長

2.阻塞不可被中斷

 

測試程式碼:

package com.zl.step5;

import java.util.concurrent.TimeUnit;

public class SynchronizedDefect {

    public synchronized void syncMethod(){

        try {
            System.out.printf("Thread [ %s ] is running .....\n ", Thread.currentThread().getName());
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedDefect defect = new SynchronizedDefect();

        Thread t1 = new Thread(defect::syncMethod,"T1");

        t1.start();

        TimeUnit.MILLISECONDS.sleep(2);


        Thread t2 = new Thread(defect::syncMethod,"T2");
        t2.start();


    }



}

 

 

package com.zl.step5;

import java.util.concurrent.TimeUnit;

public class SynchronizedDefect2 {

    public synchronized void syncMethod(){

        try {
            System.out.printf("Thread [ %s ] is running .....\n ", Thread.currentThread().getName());
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedDefect2 defect = new SynchronizedDefect2();

        Thread t1 = new Thread(defect::syncMethod,"T1");

        t1.start();

        TimeUnit.MILLISECONDS.sleep(2);


        Thread t2 = new Thread(defect::syncMethod,"T2");
        t2.start();

        TimeUnit.MILLISECONDS.sleep(2);

        t2.interrupt();


        System.out.printf("t2 interrupted : %s  \n ", t2.isInterrupted());

        System.out.printf("t2 state : %s  \n ", t2.getState());

    }



}

t2 在競爭資源的時候,處於阻塞狀態  ,是無法被打斷的。

 

顯示鎖BooleanLock

 

使其具備synchronized關鍵字所有的功能,同時又具備可中斷和lock超時的功能

1.定義Lock介面

import java.util.List;
import java.util.concurrent.TimeoutException;

/**
 * 1.lock()方法永遠阻塞,除非獲取到了鎖,這一點和synchronized非常類似
 *   但是該方法是可以被中斷的,中斷的時候,丟擲InterruptedException
 * 2.lock(long mills)  方法除了可以被中斷以外,還增加了對應的超時功能
 * 3.unlock() 方法可用來釋放鎖
 * 4.getBlockedThreads() 用於獲取當前有哪些執行緒被阻塞
 * 
 * 
 */
public interface Lock {

    void lock() throws InterruptedException;

    void lock(long mills) throws  InterruptedException, TimeoutException ;

    void unlock();

    List<Thread> getBlockedThreads();

}

 

* 1.lock()方法永遠阻塞,除非獲取到了鎖,這一點和synchronized非常類似
* 但是該方法是可以被中斷的,中斷的時候,丟擲InterruptedException
* 2.lock(long mills) 方法除了可以被中斷以外,還增加了對應的超時功能
* 3.unlock() 方法可用來釋放鎖
* 4.getBlockedThreads() 用於獲取當前有哪些執行緒被阻塞

2.實現BooleanLock

BooleanLock是Lock的一個boolean實現,通過控制一個Boolean變數的開關來決定是否允許當前的執行緒獲取該鎖。

程式碼:

 

package com.zl.step5.booleanlock;

import java.util.List;
import java.util.concurrent.TimeoutException;

/**
 * 1.lock()方法永遠阻塞,除非獲取到了鎖,這一點和synchronized非常類似
 *   但是該方法是可以被中斷的,中斷的時候,丟擲InterruptedException
 * 2.lock(long mills)  方法除了可以被中斷以外,還增加了對應的超時功能
 * 3.unlock() 方法可用來釋放鎖
 * 4.getBlockedThreads() 用於獲取當前有哪些執行緒被阻塞
 *
 *
 */
public interface Lock {

    void lock() ;

    void lock(long mills) throws  InterruptedException, TimeoutException ;

    void unlock();

    List<Thread> getBlockedThreads();

}

 

 

package com.zl.step5.booleanlock;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;

public class BooleanLock implements Lock {


    // 當前擁有鎖的執行緒
    private Thread lockThread ;

    // ture 該鎖已經被某個執行緒所獲得
    // false代表該鎖沒有被任何執行緒獲得或者已經被釋放
    private boolean locked  = false ;

    // 用來儲存哪些執行緒在獲取當前執行緒時計入阻塞狀態
    private final List<Thread> blockedList = new ArrayList<>();

    @Override
    public synchronized void lock()  {
//        System.out.println("lock 當前執行緒:"+currentThread()  );
//        System.out.println("lock 持有鎖執行緒:"+lockThread );

        // 使用lock方法使用同步程式碼塊的方式進行方法同步
        synchronized (this){
            // 如果當前鎖已經被某個執行緒所獲得,則該執行緒加入阻塞佇列
            // 並且使當前執行緒wait釋放對this monitor的所有權
            while (locked){
                final Thread tempThread = currentThread();
                try{

                    if(!blockedList.contains(currentThread())){
                        blockedList.add(currentThread());
                    }
                    this.wait();

                }catch (InterruptedException e){

                    blockedList.remove(tempThread) ;

                }

            }
            // 如果當前鎖沒有被其他執行緒所獲得,則該執行緒嘗試從阻塞佇列中刪除自己
            blockedList.remove(currentThread());
            // locked 開關指定為true
            this.locked = true ;
            //  記錄獲取鎖的執行緒
            this.lockThread = currentThread() ;

        }

    }

    @Override
    public void lock(long mills) throws InterruptedException, TimeoutException {
        synchronized (this){

            // 如果mills 不合法, 則預設呼叫lock方法 ,當然也可以丟擲引數非法異常
            if(mills <= 0){
                this.lock();
            }else{
                long remainingMills = mills ;
                long endMills = currentTimeMillis() + remainingMills ;
                while(locked){
                    //如果remainingMills 小於等於0 ,
                    // 則意味著當前執行緒被其他執行緒喚醒或者在指定的wait時間到了之後,
                    // 還沒有獲得鎖,則丟擲超時異常
                    if(remainingMills <= 0){
                        throw new TimeoutException("can not get  the lock during " + mills) ;
                    }

                    if(!blockedList.contains(currentThread())){
                        blockedList.add(currentThread());
                    }
                    // 呼叫wait
                    this.wait(remainingMills);

                    // 重新計算remainingMills 時間,因為該執行緒有可能會被喚醒
                    remainingMills = endMills - currentTimeMillis() ;
                }
                blockedList.remove(currentThread()) ;
                this.locked = true ;
                this.lockThread = currentThread() ;
            }
        }
    }

    @Override
    public  void unlock() {

//        System.out.println("unlock當前執行緒:"+currentThread()  );
//        System.out.println("unlock持有鎖執行緒:"+lockThread );
        
        synchronized (this){
            //解鎖, 那個執行緒加的鎖,只能由該執行緒進行解鎖
            if (lockThread == currentThread()){
                this.locked = false;
                Optional.of(currentThread().getName()+" release the lock." ).ifPresent(System.out::println);
                this.notifyAll();
            }else{
                System.out.println(currentThread() + " unlock fail ... ");
            }
        }

    }

    @Override
    public List<Thread> getBlockedThreads() {
        return Collections.unmodifiableList(blockedList);
    }
}

 

 

package com.zl.step5.booleanlock;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;

public class BooleanLockTest   {


    private final Lock lock = new BooleanLock();

    public  void syncMethod()  {

        try {
            lock.lock(10 * 1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        try {

            int randomInt = current().nextInt(3) ;
            System.out.println(currentThread() + " get the lock , is running .... ");
            TimeUnit.SECONDS.sleep(randomInt);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }


    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("mian Thread : " + Thread.currentThread());

        BooleanLockTest blt = new BooleanLockTest();

        IntStream.range(0,10)
                .mapToObj(i -> new Thread(blt::syncMethod))
                .forEach(Thread::start);


//        new Thread(blt::syncMethod,"T1").start();
//
//        TimeUnit.MILLISECONDS.sleep(2);
//
//        Thread t2 =  new Thread(blt::syncMethod,"T2") ;
//
//        t2.start();;
//        TimeUnit.MILLISECONDS.sleep(2);
//        t2.interrupt();



    }



}

 

 

 

 

 

 

本文整理來源於:

《Java高併發程式設計詳解:多執行緒與架構設計》 --汪文君