1. 程式人生 > >並行,JUC,併發(賣票),執行緒通訊(生產者消費者問題))

並行,JUC,併發(賣票),執行緒通訊(生產者消費者問題))

併發和並行

併發: 多個執行緒搶一份資源。比如說12306 搶票。

並行:泡方便麵。正常追求效率的情況下,撕調料包的情況下,燒熱水。比如說一個人執行了多個任務,在聽歌的時候走路。

關於兩者的區別關注下面的這個連線:

The Differences

併發會引發的問題,(執行緒的安全問題)說個視窗賣票的問題:

package com.isea.mybatis;

class Tickets{
    private int sharData = 10;
    public  void sale(){
        while (true){
            if (sharData <=0){
                break;
            }else {
                try {
                    Thread.sleep(300);
                    sharData --;
                    if (sharData == 0){
                        System.out.println(Thread.currentThread().getName() + "賣出最後一張票");
                    }else {
                        System.out.println(Thread.currentThread().getName() + "賣出一張票,還剩" + sharData );
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

public class TestTickets{
    public static void main(String[] args) {
        Tickets tickets = new Tickets();

        for (int i = 0; i < 3 ; i ++){
            new Thread(() ->{tickets.sale();},"windows" + (i + 1)).start();
        }
    }
}

執行的結果如下:

windows1賣出一張票,還剩9
windows2賣出一張票,還剩9
windows3賣出一張票,還剩9
windows1賣出一張票,還剩7
windows2賣出一張票,還剩7
windows3賣出一張票,還剩7
windows3賣出一張票,還剩6
windows1賣出一張票,還剩6
windows2賣出一張票,還剩6
windows3賣出一張票,還剩4
windows1賣出一張票,還剩5
windows2賣出一張票,還剩5
windows1賣出一張票,還剩2
windows3賣出一張票,還剩1
windows2賣出一張票,還剩2
windows1賣出最後一張票
windows3賣出一張票,還剩-2
windows2賣出一張票,還剩-2

賣出重票和負票,為什麼會出現這樣的結果呢?我們想一下這樣的場景:

視窗1(執行緒1)先搶到CPU的使用權,執行緒1開始呼叫sale方法,判斷sharData是否小於等於0,不滿足,sleep300毫秒,讓出CPU的使用權,執行緒2搶到CPU的使用權, 判斷之後,也sleep300毫秒,讓出CPU的使用權,執行緒3搶到CPU資源,判斷之後sleep300毫秒,這三個執行緒都保持初始的時候sharData值10,此時執行緒1醒來,sharData --,列印9張,接著執行緒2醒來,sharData --,列印9張,接著執行緒3醒來,sharData --,列印9張,錯誤就是這樣產生的,所以我們希望有一種機制能夠將我們共享的資源加上鎖,當某一個執行緒操作這個共享的資源的時候是無人打擾的,就好像上衛生間的時候,我們要拉上門栓。

方法的同步能夠做到這點,方法同步也叫給資源加鎖,使用  synchronized  或者是 jus Lock ,下面是一個演示:

package com.isea.java;

class Tickets{
    private int sharData = 10;

    public synchronized void sale(){
        while (true){
            if (sharData == 0){
                break;
            }else {
                try {
                    Thread.sleep(300);
                    sharData --;
                    if (sharData == 0){
                        System.out.println(Thread.currentThread().getName() + "賣出最後一張票");
                    }else {
                        System.out.println(Thread.currentThread().getName() + "賣出一張票,還剩" + sharData );
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

public class TestTickets{
    public static void main(String[] args) {
        Tickets tickets = new Tickets();
        for (int i = 0; i < 3 ; i ++){
            new Thread(() ->{tickets.sale();},"windows" + (i + 1)).start();
        }

    }
}

接下來,我們使用JUC和工程化程式碼來解決這個問題。 

JUC:java併發包處理多執行緒(工程)

java.util.concurrent ,包,

執行緒操作資源類,高內聚低耦合。

對資源操作的全部方法永遠寫在自己身上,讓資源類緊緊地帶著這些方法,俗稱高內聚,這些對資源操作的方法不能寫在呼叫者的身上,即解耦合。

Interface Lock


public class ReentrantLock implements Lock, java.io.Serializable {}

// ReentrantLock  可重用鎖

//Lock implementations provide more extensive locking operations than can be obtained using synchronized methods and statements

 下面是工程化的程式碼:

package com.isea.java;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Tickets{
    private int sharData = 10;
    private Lock lock = new ReentrantLock();

    public  void sale() {
        lock.lock();
        try {
            if (sharData > 0){
                System.out.println(Thread.currentThread().getName() + "賣出第:\t" + sharData -- + "\t 還剩下:" + sharData);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class TestTickets{
    public static void main(String[] args) {
        Tickets tickets = new Tickets();
        new Thread(() -> {for (int i = 0; i < 40; i++) { tickets.sale();}},"A").start();
        new Thread(() -> {for (int i = 0; i < 40; i++) { tickets.sale();}},"B").start();
        new Thread(() -> {for (int i = 0; i < 40; i++) { tickets.sale();}},"C").start();
    }
}

結果:

A賣出第:	10	 還剩下:9
A賣出第:	9	 還剩下:8
A賣出第:	8	 還剩下:7
A賣出第:	7	 還剩下:6
A賣出第:	6	 還剩下:5
A賣出第:	5	 還剩下:4
A賣出第:	4	 還剩下:3
A賣出第:	3	 還剩下:2
A賣出第:	2	 還剩下:1
A賣出第:	1	 還剩下:0

剛開始,我們都是使用synchronize來加鎖的,但是現在我們使用了Lock來加鎖,為甚會出現Lock呢?

當一個執行緒獲取了對應的鎖,並執行該程式碼塊時,其他執行緒便只能一直等待,等待獲取鎖的執行緒釋放鎖,而這裡獲取鎖的執行緒釋放鎖只會有兩種情況:
  1)獲取鎖的執行緒執行完了該程式碼塊,然後執行緒釋放對鎖的佔有;
  2)執行緒執行發生異常,此時JVM會讓執行緒自動釋放鎖。
那麼如果這個獲取鎖的執行緒由於要等待IO或者其他原因(比如呼叫sleep方法)被阻塞了,但是又沒有釋放鎖,其他執行緒便只能乾巴巴地等待,試想一下,這多麼影響程式執行效率。因此就需要有一種機制可以不讓等待的執行緒一直無期限地等待下去(比如只等待一定的時間或者能夠響應中斷),通過Lock就可以辦到。

兩者之間的區別:

1. lock是一個介面,而synchronized是java的一個關鍵字,synchronized是內建的語言實現;
2. synchronized在發生異常時候會自動釋放佔有的鎖,因此不會出現死鎖;而lock發生異常時候,不會主動釋放佔有的鎖,必須手動unlock來釋放鎖,可能引起死鎖的發生。(所以最好將同步程式碼塊用try catch包起來,finally中寫入unlock,避免死鎖的發生。)
3. lock等待鎖過程中可以用interrupt來終端等待,而synchronized只能等待鎖的釋放,不能響應中斷;

關於併發的出現執行緒安全的問題,還可以看看ArrayList,HashSet , HashMap的執行緒不安全,和對應的解決辦法:

HashSet ,ArrayList , HashMap的執行緒不安全

賣票的時候,是多個執行緒對同一個資源縱向的搶奪,只需要加上一把鎖即可,但是對於生產者和消費者問題,不僅有縱向的爭奪,還有橫向的執行緒之間的交流和通訊,即生產者在生產的時候需要看生產的產品是都被消費了,而消費者在消費的時候需要看生產者有沒有把產品生產出來。

兩個執行緒,可以操作初始值為零的一個變數,實現一個執行緒對變數加1,一個執行緒對變數減1,交替10輪?

這裡面面臨的問題是:生產者怎麼知道該自己生產了,消費者怎麼知道該自己消費了

 

wait和sleep的區別?

他們兩個都能夠讓執行緒進入阻塞的狀態,但是wait會釋放鎖(對於資源類的鎖),即會釋放資源;sleep是睡,但是不會釋放鎖,會將鎖對應的資源牢牢地攥在自己的手中,只是會讓出CPU,讓別的執行緒獲得執行。更多區別在下面:

sleep和wait的區別

下面是程式碼:

package com.isea.java;

/**
 * 兩個執行緒,可以操作初始值為零的一個變數,實現一個執行緒對變數加1,一個執行緒對變數減1,交替10輪
 * 1,執行緒     操作      資源類     高內聚低耦合
 * 2,判斷     幹活      喚醒
 */

class ShareData{
    private int number = 0;

    public  synchronized void increment() throws InterruptedException {
        if (number != 0){
            this.wait();
        }
        number ++;
        System.out.println(Thread.currentThread().getName() + ":\t" + number);
        this.notifyAll();
    }

    public synchronized void decrement() throws InterruptedException {
        if (number == 0){
            this.wait();
        }
        number --;
        System.out.println(Thread.currentThread().getName() + ":\t" + number);
        this.notifyAll();
    }

}

public class ProdConsumerDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }


}

執行結果:

A:	1
B:	0
A:	1
B:	0
A:	1
B:	0
A:	1
B:	0
A:	1
B:	0
A:	1
B:	0
A:	1
B:	0
A:	1
B:	0
A:	1
B:	0
A:	1
B:	0

假設現在有兩個生產者,兩個消費者:


public class ProdConsumerDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "D").start();
    }
}

結果列印:

A:	1
C:	2
D:	1
D:	0
C:	1
A:	2
C:	3
D:	2

結果發現出現了2,3這樣的資料,為什麼?這就是java多執行緒中的虛假喚醒。下飛機之後,上飛機必須再次過安檢。下面說一下虛假喚醒,

即只有滿足一定的條件之後,執行緒才能醒過來,但是在不滿足條件的情況下醒過來了,就叫做虛假喚醒。但是在只有一個生產者和只有一個消費者的情況下並不會存在虛假喚醒 ,這時候只能被對方喚醒,相當執行緒是序列的,下圖幫助理解:

 如果是多個生產者和消費者的話,情況是這樣的:

如上圖中的內容,某一個時刻,count的數量為0 ,執行緒1和執行緒2都進入等待的狀態,這事執行緒2生產一個,同時喚醒所有的執行緒,由於執行緒1和執行緒2是if判斷,只會進行一次,其中一個執行緒進行消費操作,但是另外一個執行緒並不會再次進行判斷,任然會繼續消費,虛假喚醒導致執行緒不安全。

面:請說出Object中五個常用的方法:toString() hashCode() wait() notify() notifyAll() ,equals()  , 正是因為wait()是Object中的方法,所以每一個物件都會有一把物件鎖,看一下wait的官方文件:

The current thread must own this object's monitor

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop: 

     synchronized (obj) {
         while (<condition does not hold>)
             obj.wait();
         ... // Perform action appropriate to condition
     }

即,

在進行多執行緒通訊判斷的時候,不能用if 只能用while

醒來之後,拉回來在進行一次安檢,這就是多執行緒通訊中虛假喚醒的解決辦法。

現在修改:

/**
 * 兩個執行緒,可以操作初始值為零的一個變數,實現一個執行緒對變數加1,一個執行緒對變數減1,交替10輪
 * 1,執行緒     操作      資源類     高內聚低耦合
 * 2,判斷     幹活      喚醒
 * 3,虛假喚醒
 */

class ShareData {
    private int number = 0;

    public synchronized void increment() throws InterruptedException {
        while (number != 0) {
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName() + ":\t" + number);
        this.notifyAll();
    }


    public synchronized void decrement() throws InterruptedException {
            while (number == 0) {
                this.wait();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + ":\t" + number);
            this.notifyAll();
    }
}

我們再次回過頭來,思考這樣的一個問題,為什麼會存在2,3這樣的情況呢?上文說到這是由於多執行緒通訊的時候虛假喚醒問題,這是一個什麼問題呢?

我們現在有A,B,C,D四個執行緒,A,C 生產,B,D消費。某時刻,B,C,D都在等待,number 為 0 ,A獲得CPU,並對資源加鎖,number 變成1,喚醒所有執行緒,此時,A,自己又搶到了CPU的執行權,判斷之後做了wait,釋放了鎖,此時C搶到了CPU時間片,對資源加鎖,number 變成了2,喚醒所有,也喚醒了A,A繼續搶到了CPU,number變成了3。

現在,我們用JUC的寫法來重現一下這個程式:

public interface ConditionCondition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects
 to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations.
 Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods. 

package com.isea.java;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 兩個執行緒,可以操作初始值為零的一個變數,實現一個執行緒對變數加1,一個執行緒對變數減1,交替10輪
 * 1,執行緒     操作      資源類     高內聚低耦合
 * 2,判斷     幹活      喚醒
 * 3,虛假喚醒
 */

class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public  void increment() throws InterruptedException {
        lock.lock();
        try {
            while (number != 0) {
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + ":\t" + number);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    public  void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (number == 0) {
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + ":\t" + number);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class ProdConsumerDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "D").start();
    }
}

最後關於多執行緒操作的口訣:

* 兩個執行緒,可以操作初始值為零的一個變數,實現一個執行緒對變數加1,一個執行緒對變數減1,交替10輪
* 1,執行緒     操作      資源類     高內聚低耦合
* 2,判斷     幹活      喚醒
* 3,虛假喚醒