1. 程式人生 > >Java Thread系列(四)線程通信

Java Thread系列(四)線程通信

ice 集合 就是 int name 發出 max 執行 生產

Java Thread系列(四)線程通信

一、傳統通信

public static void main(String[] args) {
    //volatile實現兩個線程間數據可見性
    private volatile static List list = new ArrayList();

    Thread t1 = new Thread(new Runnable() { // (1)
        public void run() {
            try {
                for(int i = 0; i <10; i++){
                    list.add
(i); System.out.println(Thread.currentThread().getName() + "線程添加第" + (i + 1) + "個元素.."); Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1"); Thread t2 = new
Thread(new Runnable() { // (2) public void run() { while(true){ if(list.size() == 5){ //do something throw new RuntimeException(Thread.currentThread().getName() + "線程接到通知 size = " + list.size() + " 線程停止.."
); } } } }, "t2"); t1.start(); t2.start(); }
  1. t1 線程不斷將生產的數據放入 list 集合中

  2. t2 線程開啟 while 循環監聽 t1 線程,雖然可以實現 list.size()==5 時實時通知 t2 線程,但太浪費性能,考慮用 await/notify 提高性能,程序執行結果如下:

t1線程添加第1個元素..
t1線程添加第2個元素..
t1線程添加第3個元素..
t1線程添加第4個元素..
t1線程添加第5個元素..
Exception in thread "t2" java.lang.RuntimeException: t2線程接到通知 size = 5 線程停止..
    at com.github.binarylei.thread._2_1conn.ListAdvice1$2.run(ListAdvice1.java:35)
    at java.lang.Thread.run(Thread.java:745)
t1線程添加第6個元素..
t1線程添加第7個元素..
t1線程添加第8個元素..
t1線程添加第9個元素..
t1線程添加第10個元素..

二、wait/notify 實現通信

/**
 * 使用wait/notify方法實現線程單挑通信(註意這兩個方法是Object類的方法)
 *   1. wait和notity必須配合synchronized關鍵字使用
 *   2. wait方法(關閉線程)釋放鎖,notify(喚醒線程)方法不釋放鎖
 * 缺點:通知不實時,使用CountDownLatch實現實時通知
 */
public static void main(String[] args) {
    private volatile static List list = new ArrayList();
    final Object lock = new Object();

    Thread t1 = new Thread(new Runnable() { // (1)
        public void run() {
            try {
                synchronized (lock) {
                    System.out.println("t1啟動..");
                    for(int i = 0; i <10; i++){
                        list.add(i);
                        System.out.println(Thread.currentThread().getName() + "線程添加第" + (i + 1) + "個元素..");
                        Thread.sleep(500);
                        if(list.size() == 5){
                            System.out.println("已經發出通知..");
                            lock.notify();
                        }
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "t1");

    Thread t2 = new Thread(new Runnable() { // (2)
        public void run() {
            synchronized (lock) {
                System.out.println("t2啟動..");
                if(list.size() != 5){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //do something
                throw new RuntimeException(Thread.currentThread().getName() + 
                    "線程接到通知 size = " + list.size() + " 線程停止..");
            }
        }
    }, "t2");
}
  1. t1 線程當 list.size()==5lock.notify() 喚醒 t2 線程,註意 wait/notify 必須配合 synchronized 使用

  2. t2 線程調用 lock.wait() 後處於一直阻塞狀態,直到 t1 線程調用 lock.notify() 喚醒該線程,倘若沒有線程喚醒 t2 線程,那麽 t2 線程就一直處於阻塞狀態。本例中若 t1 線程先啟動,那麽 t2 線程調用 lock.wait() 就永遠阻塞無法執行。程序執行結果如下:。

t2啟動..
t1啟動..
t1線程添加第1個元素..
t1線程添加第2個元素..
t1線程添加第3個元素..
t1線程添加第4個元素..
t1線程添加第5個元素..
已經發出通知..
t1線程添加第6個元素..
t1線程添加第7個元素..
t1線程添加第8個元素..
t1線程添加第9個元素..
t1線程添加第10個元素..
Exception in thread "t2" java.lang.RuntimeException: t2線程接到通知 size = 10 線程停止..
    at com.github.binarylei.thread._2_1conn.ListAdd2$2.run(ListAdd2.java:51)
    at java.lang.Thread.run(Thread.java:745)
  1. 由於 t1 線程 lock.notify() 後不會釋放鎖,t2 線程雖然被喚醒但不能獲取鎖,所以通知就不那麽實時,只有等 t1 線程執行完成釋放鎖後 t2 線程才能獲得鎖執行相應操作,解決方案:使用 CountDownLatch

三、CountDownLatch 實現實時通信


public static void main(String[] args) {
    private volatile static List list = new ArrayList();
    final CountDownLatch countDownLatch = new CountDownLatch(1); // (1)

    Thread t1 = new Thread(new Runnable() {
        public void run() {
            try {
                System.out.println("t1啟動..");
                for(int i = 0; i <10; i++){
                    list.add(i);
                    System.out.println(Thread.currentThread().getName() + "線程添加第" + (i + 1) + "個元素..");
                    Thread.sleep(500);
                    if(list.size() == 5){
                        System.out.println("已經發出通知..");
                        countDownLatch.countDown(); // (2)
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }, "t1");

    Thread t2 = new Thread(new Runnable() {
        public void run() {
            System.out.println("t2啟動..");
            if(list.size() != 5){
                try {
                    countDownLatch.await(); // (3)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //do something
            throw new RuntimeException(Thread.currentThread().getName() + 
                "線程接到通知 size = " + list.size() + " 線程停止..");
        }
    }, "t2");

    t1.start();
    t2.start();
}
  1. CountDownLatch 同步工具類,允許一個或多個線程一直等待,直到其他線程的操作執行完後再執行,參數 1 表示需要等待的線程數量,具體來說就是參數為幾就必須調用幾次 countDownLatch.countDown()

  2. countDownLatch.countDown() 喚醒線程

  3. countDownLatch.await() 阻塞線程,程序執行結果如下:

t1啟動..
t1線程添加第1個元素..
t2啟動..
t1線程添加第2個元素..
t1線程添加第3個元素..
t1線程添加第4個元素..
t1線程添加第5個元素..
已經發出通知..
Exception in thread "t2" java.lang.RuntimeException: t2線程接到通知 size = 5 線程停止..
t1線程添加第6個元素..
    at com.github.binarylei.thread._2_1conn.ListAdd3$2.run(ListAdd3.java:47)
    at java.lang.Thread.run(Thread.java:745)
t1線程添加第7個元素..
t1線程添加第8個元素..
t1線程添加第9個元素..
t1線程添加第10個元素..

四、ThreadLocal

ThreadLocal 是線程局部變量,是一種多線程間並發訪問變量的無鎖解決方案。

ThreadLocal 和 synchronized 比較?

  1. 與 synchronized 等加鎖的方式不同,ThreadLocal 完全不提供鎖,而使用以空間換時間的手段,為每個線程提供變量的獨立副本,以保障線程安全。

  2. 從性能上說,ThreadLocal 不具有絕對的優勢,在並發不是很高的時候,加鎖的性能會更好,但作為一套無鎖的解決方案,在高並發量或者競爭激烈的場景,使用 ThreadLocal 可以在一定程度上減少鎖競爭。

public static void main(String[] args) throws InterruptedException {
    final ThreadLocal<String> th = new ThreadLocal<String>();

    Thread t1 = new Thread(new Runnable() {
        public void run() {
            th.set("張三");
            System.out.println(th.get()); // => "張三"
        }
    }, "t1");
    
    Thread t2 = new Thread(new Runnable() {
        public void run() {
            try {
                Thread.sleep(1000);
                th.set("李四");
                System.out.println(th.get()); // => "李四"
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "t2");
    
    t1.start(); //t1:張三
    t2.start(); //t2:李四
}

五、自定義同步類窗口-Queue

Java 提供了一些同步類容器,它們是 線程安全 的,如 Vector、HashTable 等。這些同步類容器是由 Collections.synchronizedMap 等工廠方法去創建實現的,底層使用 synchronized 關鍵字,每次只有一個線程訪問容器。下面實現一個自己的同步類窗口。

import java.util.LinkedList;

public class MyQueue {   
    private LinkedList list = new LinkedList();
    private int max = 5;
    private int min = 1;
    private Object lock = new Object();

    public void put(Object obj) {  // (1)
        synchronized (lock) {
            while (list.size() == max) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    ;
                }
            }
            list.add(obj);
            lock.notify();
            System.out.println("put元素:" + obj);
        }
    }

    public Object take() {  // (2)
        Object obj;
        synchronized (lock) {
            while (list.size() == min) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    ;
                }
            }
            obj = list.removeFirst();
            lock.notify();
            System.out.println("take元素:" + obj);
        }
        return obj;
    }
}

測試

public static void main(String[] args) {
    final MyQueue myQueue = new MyQueue();
    myQueue.put("a");
    myQueue.put("b");
    myQueue.put("c");
    myQueue.put("d");
    myQueue.put("e");

    new Thread(new Runnable() {
        @Override
        public void run() {
            myQueue.put("f");
            myQueue.put("g");
            myQueue.put("h");
            myQueue.put("i");
        }
    }).start();

    new Thread(new Runnable() {
        @Override
        public void run() {
            myQueue.take();
            myQueue.take();
            myQueue.take();
        }
    }).start();
}

每天用心記錄一點點。內容也許不重要,但習慣很重要!

Java Thread系列(四)線程通信