1. 程式人生 > >多執行緒學習筆記--03執行緒間的通訊(wait/notify)

多執行緒學習筆記--03執行緒間的通訊(wait/notify)

1.執行緒間的通訊

     使用wait/notify來實現執行緒間的通訊

     生產者/消費者模式的實現

     方法join的使用

    ThreadLocal類的使用

 2.wait/notify:

       Wait()方法的作用是使當前執行執行緒的方法進行等待,該方法用來將當前執行緒置入“預執行佇列中”,並且在wait所在的程式碼處執行停止。直到接到通知或被中斷為止。呼叫wait()方法也是有限制的,就是執行緒必須獲得物件的物件鎖之後才能執行這個方法,言下之意就是wait()方法必須在同步方法或者同步程式碼塊中執行。執行wait()方法後,當前執行緒釋放鎖。在從wait()方法返回前,執行緒與其他執行緒競爭重新獲得鎖。

如果呼叫wait()時沒有持適當的鎖,則丟擲異常。

     Notify()該方法是用來通知那些可能等待該物件的物件鎖的其他執行緒。如果有多個執行緒處於等待狀態,則由執行緒規劃器隨機挑選出其中一個呈wait()狀態的執行緒,對其發出通知notify,並使他等待獲取該物件的物件鎖。和wait()方法一樣,notify()方法也是需要在同步方法同步塊中執行。即在呼叫前,執行緒也必須獲得該物件的物件級別的鎖。如果呼叫notify()時沒有獲得對應的鎖會報錯。需要注意的是:在執行notify()方法後,當前執行緒不會馬上釋放該物件鎖。呈wait狀態的執行緒也不會馬上獲得該物件鎖。要等到執行notify方法的執行緒將程式執行完,也就是退出synchronized程式碼塊後,當前執行緒才會釋放鎖。而呈wait狀態的所在的執行緒才能獲得物件鎖。當第一個獲得了該物件鎖的wait執行緒完畢以後,他會釋放掉該物件鎖。此時如果該物件沒有再次使用notify()語句,則即便該物件已經空閒,其他wait狀態等待的執行緒由於沒有接到該物件的通知,將會繼續阻塞在wait狀態,直到這個物件發出notify.

  體驗等待通知機制:

執行緒A:

package com.wx.concurrent1;

public class ThreadA extends Thread {
    private String lock;
    public ThreadA(String _lock)
    {
        this.lock=_lock;
    }
    @Override
    public void run() {
        try{
            synchronized (lock)
            {
                System.out.println("wait()前面");
                lock.wait();
                System.out.println("wait()後面");
            }
        }catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

執行緒B:

package com.wx.concurrent1;

public class ThreadB extends Thread {
    private String lock;
    public ThreadB(String _locak)
    {
        this.lock=_locak;
    }
    @Override
    public void run() {
        synchronized (lock)
        {
            System.out.println("notify方法前面");
            lock.notify();
            System.out.println("notify方法後面");
        }
    }
}

測試:

package com.wx.concurrent1;

public class Test2 {
    public static void main(String[] args)
    {
        try{
            String lock=new String();
            ThreadA threadA=new ThreadA(lock);
            ThreadB threadB=new ThreadB(lock);
            threadA.start();
            Thread.sleep(3000);
            threadB.start();
        }catch (Exception e)
        {
            e.printStackTrace();
        }

    }
}

  

 

synchronized可以將任何一個Object物件作為一個同步物件來看待,而java為每個Object物件都實現了wait和notify方法。他們必須用在被synchronized同步的Object臨界區內。通過呼叫wait方法可以使臨界區進入內的執行緒等待狀態。同時釋放被同步的物件鎖。而notify可以喚醒一個因呼叫了wait操作而處於阻塞狀態的執行緒,使其進入就緒狀態。被重新喚醒的執行緒會試圖重新獲得臨界區的控制權,也就是鎖。並繼續執行臨界區內wait之後的程式碼。如果發出notify操作沒有處於阻塞狀態中的執行緒,那麼該命令將會被忽略。

Wait方法可使呼叫該方法的執行緒釋放共享資源的鎖,從執行狀態退出,進入等待佇列,直到再次被喚醒。

Notify方法可以喚醒等待佇列中等待同一共享資源的一個執行緒,並使該執行緒退出等待佇列,進入就緒狀態。

NitifyAll方法可以使所有正在等待佇列中等待同一共享資源的全部執行緒從等待狀態退出。進入可執行狀態。同時優先順序最高的哪個執行緒最優先執行,但也有可能隨機執行,看JVM的實現。

                                        

Runnable(可執行)狀態和Running(執行)狀態:

  一般執行緒呼叫start()方法以後系統就會為執行緒分配CPU資源,這時執行緒就是可執行狀態,如果執行緒搶到Cpu資源,執行緒就會進入執行狀態。

  執行緒進入可執行狀態的5種姿勢:

  1. 呼叫sleep()方法後經過的時間超過了指定的休眠時間。
  2. 執行緒呼叫的阻塞IO已經返回,阻塞方法方法執行完畢
  3. 執行緒成功的獲得了試圖同步的監視器
  4. 執行緒正在等待某個通知,其他執行緒發出了通知
  5. 處於掛起狀態的執行緒呼叫了resume方法

  處於阻塞狀態的5種姿勢:

  1. 執行緒呼叫sleep方法,主動放棄佔用的處理器資源。
  2. 執行緒呼叫了阻塞式IO的方法,在該方法返回前,該執行緒被阻塞。
  3. 執行緒試圖獲得一個同步監視器,但該同步監視器正被其他執行緒所持有。
  4. 執行緒等待某個通知。
  5. 程式帶哦用了suspend方法將該執行緒掛起,此方法容易導致死鎖,儘量避免使用該方法。

   每個鎖物件都有兩個佇列,一個是就緒佇列,一個是阻塞佇列。就緒佇列儲存了將要獲得鎖的執行緒。阻塞佇列儲存了被阻塞的執行緒。一個執行緒被喚醒後才會進入就緒佇列。等待CPU的排程,反之,一個執行緒被wait後,就會進入阻塞佇列等待下一次被喚醒。

如果當執行緒處於wait狀態的時候,呼叫執行緒中斷的方法interrupt()會出現異常。

鎖被釋放的幾種情況:

執行完同步程式碼塊後就會釋放物件的鎖,

執行同步程式碼塊遇到異常使執行緒終止,也會釋放物件鎖,

執行同步程式碼塊的時候,如果執行了鎖所屬物件的wait()方法,這個執行緒會釋放物件鎖。而此執行緒物件會進入執行緒等待池中,等待被喚醒。

   

Wait(long):帶一個引數的Wait(long)方法的功能是等待某一時間內是否有執行緒對鎖進行喚醒,如果沒有則自動喚醒。

如果notify通知過早則會打亂程式的正常執行。

基於等待通知模型的生產者消費者模型:

 1.操作值

建立一個生產者,生產之前判斷一下值是否為空,如果不為空就呼叫wait()釋放當前的鎖,讓消費者去消費,消費完了再賦值,賦值完喚醒消費者執行緒:

package com.wx.concurrent3;

public class P {
    private String lock;
    public P(String _locak)
    {
        this.lock=_locak;
    }
    public void setValue()
    {
        try {
            synchronized (lock)
            {
                if(!ValueObject.value.equals(""))
                {
                    lock.wait();
                }
                String value=   System.currentTimeMillis()+"_"+System.nanoTime();
                System.out.println("set的值是:"+value);
                ValueObject.value=value;
                lock.notify();
            }
        }catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

 消費者執行緒,如果消費的資料為空就釋放當前鎖給生產者,如果不為空就消費,消費完畢喚醒生產者執行緒:

package com.wx.concurrent3;

public class C {
    private String lock;
    public C (String _locak)
    {
        this.lock=_locak;
    }
    public void getValue()
    {
        try {
            synchronized (lock)
            {
                if(ValueObject.value.equals(""))
                {
                    lock.wait();
                }
                String value= System.currentTimeMillis()+"_"+System.nanoTime();
                System.out.println("get的值是:"+value);
                ValueObject.value="";
                lock.notify();
            }
        }catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}
package com.wx.concurrent3;

public class ValueObject {
    public static String value="";
}

  建立兩個執行緒,一個消費者執行緒,一個生產者執行緒:

package com.wx.concurrent3;

public class ThreadA extends Thread {
    private  P p;
    public  ThreadA(P _p)
    {
        this.p=_p;
    }
    @Override
    public void run() {
           while(true)
           {
               p.setValue();
           }
    }
}


package com.wx.concurrent3;

public class ThreadB extends Thread {
    private  C c ;
    public  ThreadB(C _c)
    {
        this.c=_c;
    }
    @Override
    public void run() {
        while(true)
        {
            c.getValue();
        }
    }
}

 測試:
 

package com.wx.concurrent3;

public class Test1 {
    public static void main(String[] args)
    {
        String lock=new String("");
        P p=new P(lock);
        C c=new C(lock);
        ThreadA threadA=new ThreadA(p);
        ThreadB threadB=new ThreadB(c);
        threadA.start();
        threadB.start();
    }
}

 

如果有多個生產者和多個消費者,則可能會出現假死的現象,即所有的執行緒都進入了等待狀態,程式永遠不可能執行完了。為什麼會出現這樣的情況呢?因為可能是生產者喚醒了生產者,消費者喚醒了消費者,最後大家都會進入wait等待狀態了。

那麼如何解決假死呢?將notify方法換成notifyAll方法,這樣就就可以通知到所有處於等待的狀態的執行緒。

2.操作棧:生產者向list集合中新增資料,然後消費者取,保證容器的最大容量為1

    第一種情況,一個生產者,一個消費者

     建立一個棧類:

package com.wx.concurrent4;

import java.util.ArrayList;
import java.util.List;

public class MyStack {
    private List list=new ArrayList();
    //壓入棧的方法
    synchronized public void push()
    {
        try {
            if(list.size()==1)
            {
                this.wait();
            }
            list.add("anyString"+Math.random());
            this.notify();
            System.out.println("push="+list.size());
        }catch (Exception e)
        {
            e.printStackTrace();
        }
    }
    //彈出棧的方法
    synchronized public void pop()
    {
        try{
            if (list.size()==0)
            {
                System.out.println("pop的操作中執行緒"+Thread.currentThread().getName()+"呈wait()狀態");
                this.wait();
            }
            list.remove(0);
            this.notify();
            System.out.println(" pop="+ list.size());
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

  生產者和消費者類:

package com.wx.concurrent4;
public class P {
    private MyStack myStack;
    public P(MyStack myStack)
    {
        this.myStack=myStack;
    }
    public void push()
    {
        myStack.push();
    }
}


package com.wx.concurrent4;
public class C {
    private MyStack myStack;
    public C(MyStack myStack)
    {
        this.myStack=myStack;
    }
    public void pop()
    {
        myStack.pop();
    }
}

 生產者和消費者執行緒:

package com.wx.concurrent4;
public class ThreadA extends Thread {
    private  P p;
    public  ThreadA(P _p)
    {
        this.p=_p;
    }
    @Override
    public void run() {
           while(true)
           {
               p.push();
           }
    }
}


package com.wx.concurrent4;

public class ThreadB extends Thread {
    private  C c ;
    public  ThreadB(C _c)
    {
        this.c=_c;
    }
    @Override
    public void run() {
        while(true)
        {
            c.pop();
        }
    }
}

 測試(最初的目的實現了,容器的大小始終在0和1之間交替):

package com.wx.concurrent4;

public class Test1 {
    public static void main(String[] args)
    {
        MyStack myStack=new MyStack();
        P p=new P(myStack);
        C c=new C(myStack);
        ThreadA threadA=new ThreadA(p);
        ThreadB threadB=new ThreadB(c);
        threadA.start();
        threadB.start();
    }
}

 

第二種情況,生產者還是一個,但是消費者有多個,還是保證容器最大容量為1

執行報錯:

從執行的結果分析原因,首先生產者執行緒啟動,消費者執行緒也啟動,生產者生產完了以後,通知消費者消費,消費完了以後,通知生產者,然後在生產者被喚醒試圖爭奪臨界區控制權的時候,其他阻塞在同步方法外的消費者已經進入同步方法,挨著呼叫了wait()方法進入等待狀態,之後鎖被釋放,然後生產者得到鎖,生產並喚醒消費者。隨機一個消費者被喚醒後進行消費。消費完了以後通知處於wait()狀態的執行緒,重點來了,通知的一定是處於wait的生產者嗎?當然不是,可能通知的是處於wait()狀態的消費者,並且機率很大,另一個消費者再次被喚醒接著執行wait()之後的語句list.remove(0);這就是報錯的原因。

 得知原因後如何解決這個問題呢?顯然會想到上面用過的使用notifyAll()方法通知所有處於等待狀態的執行緒,但是他依然會喚醒消費者執行緒,依然會執行list.remove(0)操作,依然會報錯。那麼怎麼辦呢?如果處於等待狀態的執行緒被喚醒後再次執行一下判斷條件就好了。這裡就需要知道if條件和while條件的區別了?他們的區別在這裡起了很大的作用,if條件中執行緒被喚醒後直接執行wait()後面的語句,不再執行if的判斷語句,但是如果使用while的話,從wait下面繼續執行,還會返回執行while的條件判斷.所以if判斷改為while判斷問題解決。為了避免假死需要使用notifyAll()。

第三種情況,還是操作值棧,多生產與一個消費者,上面的問題解決了這個就不會有問題了。

第四種情況:多生產多消費,也沒有問題了。