1. 程式人生 > >java執行緒之間的通訊方式

java執行緒之間的通訊方式

引言

當多個執行緒需要協作來完成一件事情的時候,如何去等待其他執行緒執行,又如何當執行緒執行完去通知其他執行緒結束等待。

執行緒與程序的區別

程序可以獨立執行,它是系統進行資源分配和排程的獨立單位。
執行緒是程序的一個實體,是CPU排程和分派的基本單位,比程序更小的獨立單位,它基本上不擁有系統資源。
他們之間的聯絡:
一個執行緒屬於一個程序,而一個程序有多個執行緒,多個執行緒共享該程序的所有資源。
區別:
1.排程:執行緒作為CPU排程和分派的基本單位,程序擁有系統資源的獨立單位。
2.併發性:程序之間併發執行,同一個程序的多個執行緒也能併發執行。
3.擁有資源,程序是擁有系統資源的基本單位,執行緒不擁有資源,但能訪問程序的資源。

執行緒之間的通訊方式有哪些

1.join
2.共享變數(volatile、AtomicInteger)
3.notify/wait
4.lock/condition
5.管道

join

首先,開啟兩個執行緒,分別列印123,執行緒A和執行緒B會不按套路列印。

如果必須要執行緒A線上程B之前列印123呢?
需要使用thread1.join();//我會等待執行緒1執行完成後再進行執行

join的原理

實際上join方法內部是通過wait實現的。

上一段jdk原始碼

public final synchronized void join(long millis)
    throws
InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else
{ while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }

這個join的原理很簡單,前面那些if條件不管,主要看while迴圈裡面的,while迴圈就是不斷去判斷this.isAlive的結果,用上面的例子,這個this就是joinThread。然後關鍵的程式碼就是wait(delay);一個定時的wait。這個wait的物件也是this,就是joinThread。上面我們已經講了wait一定要在同步方法或者同步程式碼塊中,原始碼中join方法的修飾符就是一個synchronized,表明這是一個同步的方法。
不要看呼叫wait是joinThread,是一個執行緒。但是真正因為wait進入阻塞狀態的,是持有物件監視器的執行緒,這裡的物件監視器是joinThread,持有他的是main執行緒,因為在main執行緒中執行了join這個同步方法。
所以main執行緒不斷的wait,直到呼叫join方法那個執行緒物件銷燬,才繼續向下執行。
但是原始碼中只有wait的方法,沒有notify的方法。因為notify這個操作是JVM通過檢測執行緒物件銷燬而呼叫的native方法,是C++實現的,在原始碼中是找不到對應這個wait方法而存在的notify方法的。

也就是說。
利用Thread.join()方法來實現,join()方法的作用是等待呼叫執行緒執行完之後再執行任務。
這個是必須執行緒A全部執行完,再去執行B.

wait/notify

如果是交替列印呢?
必須使用wait()和notify()方法。

一道面試題。
編寫兩個執行緒,一個執行緒列印1~52,另一個執行緒列印字母A~Z,列印順序為12A34B56C……5152Z,要求使用執行緒間的通訊。
程式碼如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by Edison Xu on 2017/3/2.
 */
public enum Helper {

    instance;

    private static final ExecutorService tPool = Executors.newFixedThreadPool(2);

    public static String[] buildNoArr(int max) {
        String[] noArr = new String[max];
        for(int i=0;i<max;i++){
            noArr[i] = Integer.toString(i+1);
        }
        return noArr;
    }

    public static String[] buildCharArr(int max) {
        String[] charArr = new String[max];
        int tmp = 65;
        for(int i=0;i<max;i++){
            charArr[i] = String.valueOf((char)(tmp+i));
        }
        return charArr;
    }

    public static void print(String... input){
        if(input==null)
            return;
        for(String each:input){
            System.out.print(each);
        }
    }

    public void run(Runnable r){
        tPool.submit(r);
    }

    public void shutdown(){
        tPool.shutdown();
    }

}

使用wait和notify

public class MethodOne {
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                try {
                    for (int i = 0; i < arr.length; i=i+2) {
                        synchronized (threadToGo) {
                            while (threadToGo.value == 2)
                                threadToGo.wait();
                            Helper.print(arr[i], arr[i + 1]);
                            threadToGo.value = 2;
                            threadToGo.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Oops...");
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                try {
                    for (int i = 0; i < arr.length; i++) {
                        synchronized (threadToGo) {
                            while (threadToGo.value == 1)
                                threadToGo.wait();
                            Helper.print(arr[i]);
                            threadToGo.value = 1;
                            threadToGo.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Oops...");
                }
            }
        };
    }
    class ThreadToGo {
        int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
        MethodOne one = new MethodOne();
        Helper.instance.run(one.newThreadOne());
        Helper.instance.run(one.newThreadTwo());
        Helper.instance.shutdown();
    }
}

注意:
wait和notify方法必須放在同步塊裡面,因為要對應同一個物件監視器。而sleep沒有

原理詳解:

wait方法會使執行該wait方法的執行緒停止,直到等到了notify的通知。細說一下,執行了wait方法的那個執行緒會因為wait方法而進入等待狀態,該執行緒也會進入阻塞佇列中。而執行了notify那個執行緒在執行完同步程式碼之後會通知在阻塞佇列中的執行緒,使其進入就緒狀態。被重新喚醒的執行緒會試圖重新獲得臨界區的控制權,也就是物件鎖,然後繼續執行臨界區也就是同步語句塊中wait之後的程式碼。
上面這個描述,可以看出一些細節。
1.wait方法進入了阻塞佇列,而上文講過執行notify操作的執行緒與執行wait的執行緒是擁有同一個物件監視器,也就說wait方法執行之後,立刻釋放掉鎖,這樣,另一個執行緒才能執行同步程式碼塊,才能執行notify。
2.notify執行緒會在執行完同步程式碼之後通知在阻塞佇列中的執行緒,也就是說notify的那個執行緒並不是立即釋放鎖,而是在同步方法執行完,釋放鎖以後,wait方法的那個執行緒才會繼續執行。
3.被重新喚醒的執行緒會試圖重新獲得鎖,也就說,在notify方法的執行緒釋放掉鎖以後,其通知的執行緒是不確定的,看具體是哪一個阻塞佇列中的執行緒獲取到物件鎖。

這裡詳細說一下,這個結果。wait使執行緒進入了阻塞狀態,阻塞狀態可以細分為3種:
● 等待阻塞:執行的執行緒執行wait方法,JVM會把該執行緒放入等待佇列中。
● 同步阻塞:執行的執行緒在獲取物件的同步鎖時,若該同步鎖被別的執行緒佔用,則JVM會把該執行緒放入鎖池當中。
● 其他阻塞: 執行的執行緒執行了Thread.sleep或者join方法,或者發出I/O請求時,JVM會把該執行緒置為阻塞狀態。當sleep()狀態超時、join()等待執行緒終止,或者超時、或者I/O處理完畢時,執行緒重新轉入可執行狀態。
可執行狀態就是執行緒執行start時,就是可執行狀態,一旦CPU切換到這個執行緒就開始執行裡面的run方法就進入了執行狀態。
上面會出現這個結果,就是因為notify僅僅讓一個執行緒進入了可執行狀態,而另一個執行緒則還在阻塞中。而notifyAll則使所有的執行緒都從等待佇列中出來,而因為同步程式碼的關係,獲得鎖的執行緒進入可執行態,沒有得到鎖的則進入鎖池,也是阻塞狀態,但是會因為鎖的釋放而重新進入可執行態。所以notifyAll會讓所有wait的執行緒都會繼續執行。

Lock和Condition

如何程式不使用synchronized關鍵字來保持同步,而是直接適用Lock對像來保持同步,則系統中不存在隱式的同步監視器物件,也就不能使用wait()、notify()、notifyAll()來協調執行緒的執行.

當使用LOCK物件保持同步時,JAVA為我們提供了Condition類來協調執行緒的執行。關於Condition類,JDK文件裡進行了詳細的解釋.,再次就不囉嗦了。

程式碼如下:

public class MethodTwo {
    private Lock lock = new ReentrantLock(true);
    private Condition condition = lock.newCondition();
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    try {
                        lock.lock();
                        while(threadToGo.value == 2)
                            condition.await();
                        Helper.print(arr[i], arr[i + 1]);
                        threadToGo.value = 2;
                        condition.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    try {
                        lock.lock();
                        while(threadToGo.value == 1)
                            condition.await();
                        Helper.print(arr[i]);
                        threadToGo.value = 1;
                        condition.signal();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };
    }
    class ThreadToGo {
        int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
        MethodTwo two = new MethodTwo();
        Helper.instance.run(two.newThreadOne());
        Helper.instance.run(two.newThreadTwo());
        Helper.instance.shutdown();
    }
}

輸出結果和上面是一樣的! 只不過這裡 顯示的使用Lock對像來充當同步監視器,使用Condition物件來暫停指定執行緒,喚醒指定執行緒!

共享變數(volatile、AtomicInteger)

volatile修飾的變數值直接存在main memory裡面,子執行緒對該變數的讀寫直接寫入main memory,而不是像其它變數一樣在local thread裡面產生一份copy。volatile能保證所修飾的變數對於多個執行緒可見性,即只要被修改,其它執行緒讀到的一定是最新的值。

程式碼如下:

public class MethodThree {
    private volatile ThreadToGo threadToGo = new ThreadToGo();
    class ThreadToGo {
        int value = 1;
    }
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    while(threadToGo.value==2){}
                    Helper.print(arr[i], arr[i + 1]);
                    threadToGo.value=2;
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    while(threadToGo.value==1){}
                    Helper.print(arr[i]);
                    threadToGo.value=1;
                }
            }
        };
    }
    public static void main(String args[]) throws InterruptedException {
        MethodThree three = new MethodThree();
        Helper.instance.run(three.newThreadOne());
        Helper.instance.run(three.newThreadTwo());
        Helper.instance.shutdown();
    }
}

管道

管道流是JAVA中執行緒通訊的常用方式之一,基本流程如下:

1)建立管道輸出流PipedOutputStream pos和管道輸入流PipedInputStream pis

2)將pos和pis匹配,pos.connect(pis);

3)將pos賦給資訊輸入執行緒,pis賦給資訊獲取執行緒,就可以實現執行緒間的通訊了

程式碼如下:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class testPipeConnection {

    public static void main(String[] args) {
        /**
         * 建立管道輸出流
         */
        PipedOutputStream pos = new PipedOutputStream();
        /**
         * 建立管道輸入流
         */
        PipedInputStream pis = new PipedInputStream();
        try {
            /**
             * 將管道輸入流與輸出流連線 此過程也可通過過載的建構函式來實現
             */
            pos.connect(pis);
        } catch (IOException e) {
            e.printStackTrace();
        }
        /**
         * 建立生產者執行緒
         */
        Producer p = new Producer(pos);
        /**
         * 建立消費者執行緒
         */
        Consumer1 c1 = new Consumer1(pis);
        /**
         * 啟動執行緒
         */
        p.start();
        c1.start();
    }
}

/**
 * 生產者執行緒(與一個管道輸入流相關聯)
 * 
 */
class Producer extends Thread {
    private PipedOutputStream pos;

    public Producer(PipedOutputStream pos) {
        this.pos = pos;
    }

    public void run() {
        int i = 0;
        try {
            while(true)
            {
            this.sleep(3000);
            pos.write(i);
            i++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消費者執行緒(與一個管道輸入流相關聯)
 * 
 */
class Consumer1 extends Thread {
    private PipedInputStream pis;

    public Consumer1(PipedInputStream pis) {
        this.pis = pis;
    }

    public void run() {
        try {
            while(true)
            {
            System.out.println("consumer1:"+pis.read());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

程式啟動後,就可以看到producer執行緒往consumer1執行緒傳送資料。

consumer1:0
consumer1:1
consumer1:2
consumer1:3
......

管道流雖然使用起來方便,但是也有一些缺點

1)管道流只能在兩個執行緒之間傳遞資料

執行緒consumer1和consumer2同時從pis中read資料,當執行緒producer往管道流中寫入一段資料後,每一個時刻只有一個執行緒能獲取到資料,並不是兩個執行緒都能獲取到producer傳送來的資料,因此一個管道流只能用於兩個執行緒間的通訊。不僅僅是管道流,其他IO方式都是一對一傳輸。

2)管道流只能實現單向傳送,如果要兩個執行緒之間互通訊,則需要兩個管道流

可以看到上面的例子中,執行緒producer通過管道流向執行緒consumer傳送資料,如果執行緒consumer想給執行緒producer傳送資料,則需要新建另一個管道流pos1和pis1,將pos1賦給consumer1,將pis1賦給producer,具體例子本文不再多說。