1. 程式人生 > >JUC(二、執行緒間通訊)

JUC(二、執行緒間通訊)

兩個執行緒,一個執行緒列印1-52,另一個列印字母A-Z列印順序為12A34B...5152Z,
要求用執行緒間通訊

1.synchronized實現

package com.liuyuanyuan.thread;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
import org.omg.IOP.Codec;
 
 
class ShareDataOne//資源類
{
  private int number = 0;//初始值為零的一個變數
 
  public synchronized void increment() throws InterruptedException 
  {
     //1判斷
     if(number !=0 ) {
       this.wait();
     }
     //2幹活
     ++number;
     System.out.println(Thread.currentThread().getName()+"\t"+number);
     //3通知
     this.notifyAll();
  }
  
  public synchronized void decrement() throws InterruptedException 
  {
     // 1判斷
     if (number == 0) {
       this.wait();
     }
     // 2幹活
     --number;
     System.out.println(Thread.currentThread().getName() + "\t" + number);
     // 3通知
     this.notifyAll();
  }
}
 
/**
 * 
 * @Description:
 *現在兩個執行緒,
 * 可以操作初始值為零的一個變數,
 * 實現一個執行緒對該變數加1,一個執行緒對該變數減1,
 * 交替,來10輪。 
 * @author liuyuan
 *
 *  * 筆記:Java裡面如何進行工程級別的多執行緒編寫
 * 1 多執行緒變成模板(套路)-----上
 *     1.1  執行緒    操作    資源類  
 *     1.2  高內聚  低耦合
 * 2 多執行緒變成模板(套路)-----下
 *     2.1  判斷
 *     2.2  幹活
 *     2.3  通知
 
 */
public class NotifyWaitDemoOne
{
  public static void main(String[] args)
  {
     ShareDataOne sd = new ShareDataOne();
     new Thread(() -> {
       for (int i = 1; i < 10; i++) {
          try {
            sd.increment();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "A").start();
     new Thread(() -> {
       for (int i = 1; i < 10; i++) {
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "B").start();
  }
}
/*
 * * 
 * 2 多執行緒變成模板(套路)-----下
 *     2.1  判斷
 *     2.2  幹活
 *     2.3  通知
 * 3 防止虛假喚醒用while
 * 
 * 
 * */
 
 
 

換成4個執行緒會導致錯誤,虛假喚醒
 
原因:在java多執行緒判斷時,不能用if,程式出事出在了判斷上面,
突然有一新增的執行緒進到if了,突然中斷了交出控制權,
沒有進行驗證,而是直接走下去了,加了兩次,甚至多次

舉個生活中的例子:在飛機場等候飛機,如遇到一些問題不能起飛,被機場安排到賓館臨時休息,問題解決後,再次回到機場時依舊需要重新安檢。

解決虛假喚醒:檢視API,java.lang.Object

 
中斷和虛假喚醒是可能產生的,所以要用loop迴圈,if只判斷一次,while是隻要喚醒就要拉回來再判斷一次。if換成while

以上是用的synchronized來實現的

下面我們用jdk1.8的更好的實現方式

思路很簡單

接下來是具體的程式碼實現

Condition:檢視API,java.util.concurrent

 
class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 
 
   final Object[] items = new Object[100];
   int putptr, takeptr, count;
 
   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }
 

2、執行緒間定製化呼叫通訊

1、有順序通知,需要有標識位
 
2、有一個鎖Lock,3把鑰匙Condition
 
3、判斷標誌位
 
4、輸出執行緒名+第幾次+第幾輪
 
5、修改標誌位,通知下一個
 
 以上可能比較晦澀,接下來看具體的程式碼實現

package com.liuyuanyuan.thread;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
 
class ShareResource
{
  private int number = 1;//1:A 2:B 3:C 
  private Lock lock = new ReentrantLock();
  private Condition c1 = lock.newCondition();
  private Condition c2 = lock.newCondition();
  private Condition c3 = lock.newCondition();
 
  public void print5(int totalLoopNumber)
  {
     lock.lock();
     try 
     {
       //1 判斷
       while(number != 1)
       {
          //A 就要停止
          c1.await();
       }
       //2 幹活
       for (int i = 1; i <=5; i++) 
       {
          System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
       }
       //3 通知
       number = 2;
       c2.signal();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
  }
  public void print10(int totalLoopNumber)
  {
     lock.lock();
     try 
     {
       //1 判斷
       while(number != 2)
       {
          //A 就要停止
          c2.await();
       }
       //2 幹活
       for (int i = 1; i <=10; i++) 
       {
          System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
       }
       //3 通知
       number = 3;
       c3.signal();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
  }  
  
  public void print15(int totalLoopNumber)
  {
     lock.lock();
     try 
     {
       //1 判斷
       while(number != 3)
       {
          //A 就要停止
          c3.await();
       }
       //2 幹活
       for (int i = 1; i <=15; i++) 
       {
          System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
       }
       //3 通知
       number = 1;
       c1.signal();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
  }  
}
 
 
/**
 * 
 * @Description: 
 * 多執行緒之間按順序呼叫,實現A->B->C
 * 三個執行緒啟動,要求如下:
 * 
 * AA列印5次,BB列印10次,CC列印15次
 * 接著
 * AA列印5次,BB列印10次,CC列印15次
 * ......來10輪  
 * @author liuyuan
 *
 */
public class ThreadOrderAccess
{
  public static void main(String[] args)
  {
     ShareResource sr = new ShareResource();
     
     new Thread(() -> {
       for (int i = 1; i <=10; i++) 
       {
          sr.print5(i);
       }
     }, "AA").start();
     new Thread(() -> {
       for (int i = 1; i <=10; i++) 
       {
          sr.print10(i);
       }
     }, "BB").start();
     new Thread(() -> {
       for (int i = 1; i <=10; i++) 
       {
          sr.print15(i);
       }
     }, "CC").start();   
     
     
  }
}
 
 

3.NotSafeDemo

需求:請舉例說明集合類是不安全的

java.util.ConcurrentModificationException


ArrayList在迭代的時候如果同時對其進行修改就會
丟擲java.util.ConcurrentModificationException異常
併發修改異常

List<String> list = new ArrayList<>();
for (int i = 0; i <30 ; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
 
看ArrayList的原始碼
public boolean add(E e) {
    ensureCapacityInternal(size + 1);  // Increments modCount!!
    elementData[size++] = e;
    return true;
}
沒有synchronized執行緒不安全

如何解決呢 ?

List<String> list = new Vector<>();

看Vector的原始碼
public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}
有synchronized執行緒安全
 

List<String> list = Collections.synchronizedList(new ArrayList<>());
Collections提供了方法synchronizedList保證list是同步執行緒安全的
 
那HashMap,HashSet是執行緒安全的嗎?也不是
所以有同樣的執行緒安全方法

 

終極大boss出來了:寫時複製

List<String> list = new CopyOnWriteArrayList<>();

接下來介紹一下寫時複製

寫時複製為了解決:不加鎖效能提升出錯誤,加鎖資料一致性能下降問題

接下來介紹CopyOnWriteArrayList

A thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.
CopyOnWriteArrayList是arraylist的一種執行緒安全變體,
其中所有可變操作(add、set等)都是通過生成底層陣列的新副本來實現的。
 

複製了一份,往裡追加寫

 
/**
 * Appends the specified element to the end of this list.
 *
 * @param e element to be appended to this list
 * @return {@code true} (as specified by {@link Collection#add})
 */
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}
 
 
 
CopyOnWrite容器即寫時複製的容器。往一個容器新增元素的時候,不直接往當前容器Object[]新增,
而是先將當前容器Object[]進行Copy,複製出一個新的容器Object[] newElements,然後向新的容器Object[] newElements裡新增元素。
新增元素後,再將原容器的引用指向新的容器setArray(newElements)。
這樣做的好處是可以對CopyOnWrite容器進行併發的讀,而不需要加鎖,因為當前容器不會新增任何元素。
所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器。
 
 

對於set和map有寫時複製技術麼?

當然是有的

Set<String> set = new HashSet<>();//執行緒不安全
 

Set<String> set = new CopyOnWriteArraySet<>();//執行緒安全
 
 
 
HashSet底層資料結構是什麼?
HashMap  ?
 
但HashSet的add是放一個值,而HashMap是放K、V鍵值對
 
 
 
 
 
 
public HashSet() {
    map = new HashMap<>();
}
 
private static final Object PRESENT = new Object();
 
public boolean add(E e) {
    return map.put(e, PRESENT)==null;
}
 
 
Map<String,String> map = new HashMap<>();//執行緒不安全
 

Map<String,String> map = new ConcurrentHashMap<>();//執行緒安全
 
 
  
 
package com.liuyuanyuan.gmall.jucdemo;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * 請舉例說明集合類是不安全的
 */
public class NotSafeDemo {
    public static void main(String[] args) {

        Map<String,String> map = new ConcurrentHashMap<>();
        for (int i = 0; i <30 ; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,8));
                System.out.println(map);
            },String.valueOf(i)).start();
        }


    }

    private static void setNoSafe() {
        Set<String> set = new CopyOnWriteArraySet<>();
        for (int i = 0; i <30 ; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }

    private static void listNoSafe() {
        //        List<String> list = Arrays.asList("a","b","c");
        //        list.forEach(System.out::println);
        //寫時複製
        List<String> list = new CopyOnWriteArrayList<>();
        // new CopyOnWriteArrayList<>();
        //Collections.synchronizedList(new ArrayList<>());
        //new Vector<>();//new ArrayList<>();

        for (int i = 0; i <30 ; i++) {
                    new Thread(()->{
                        list.add(UUID.randomUUID().toString().substring(0,8));
                        System.out.println(list);
                    },String.valueOf(i)).start();
                }
    }


}





    /**
     * 寫時複製
     CopyOnWrite容器即寫時複製的容器。往一個容器新增元素的時候,不直接往當前容器Object[]新增,
     而是先將當前容器Object[]進行Copy,複製出一個新的容器Object[] newElements,然後向新的容器Object[] newElements裡新增元素。
     新增元素後,再將原容器的引用指向新的容器setArray(newElements)。
     這樣做的好處是可以對CopyOnWrite容器進行併發的讀,而不需要加鎖,因為當前容器不會新增任何元素。
     所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器。

     *
     *
     *
     *

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }