1. 程式人生 > >《 Java併發程式設計從入門到精通》第5章 多執行緒之間互動:執行緒閥

《 Java併發程式設計從入門到精通》第5章 多執行緒之間互動:執行緒閥

javaC

作者:張振華    購買連結:天貓商城

(投入多少,收穫多少。參與多深,領悟多深,京東,亞馬遜,噹噹均有銷售。)

5.1 執行緒安全的阻塞佇列BlockingQueue

(1)先理解一下Queue、Deque、BlockingQueue的概念:

Queue(佇列) :用於儲存一組元素,不過在存取元素的時候必須遵循先進先出原則。佇列是一種特殊的線性表,它只允許在表的前端(front)進行刪除操作,而在表的後端(rear)進行插入操作。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。佇列中沒有元素時,稱為空佇列。在佇列這種資料結構中,最先插入的元素將是最先被刪除的元素;反之最後插入的元素將是最後被刪除的元素,因此佇列又稱為“先進先出”(FIFO—first in first out)的線性表。

Deque(雙端佇列): 兩端都可以進出的佇列。當我們約束從佇列的一端進出隊時,就形成了另外一種存取模式,它遵循先進後出原則,這就是棧結構。雙端佇列主要是用於棧操作。使用站結構讓操作有可追溯性(如windows視窗位址列內的路徑前進棧、後退棧)。

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

阻塞佇列提供了四種處理方法:

方法\處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
  • 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。

(2)Java裡的阻塞佇列最新JDK中提供了7個阻塞佇列。分別是:

BlockingQueue常用的方法有,更多方法請查詢API:

1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則招聘異常

2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.

3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.

4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null

5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止

其中:BlockingQueue 不接受null 元素。試圖add、put 或offer 一個null 元素時,某些實現會丟擲NullPointerException。null 被用作指示poll 操作失敗的警戒值。

5.2 ArrayBlockingQueue

ArrayBlockingQueue一個由陣列支援的有界的阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素。佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。

這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。

此類支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略。預設情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設定為 true 而構造的佇列允許按照 FIFO 順序訪問執行緒。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

先看一下ArrayBlockingQueue的部分原始碼:理解一下ArrayBlockingQueue的實現原理和機制

public class ArrayBlockingQueue <E> extends AbstractQueue<E>  implements BlockingQueue<E>, java.io.Serializable {  //陣列的儲存結構  final Object[] items;    //鎖採用的機制  final ReentrantLock lock; public ArrayBlockingQueue( int capacity, boolean fair) {  if (capacity <= 0)  throw new IllegalArgumentException();  this.items = new Object[capacity];         //通過將公平性 (fairness) 設定為 true 而構造的佇列允許按照 FIFO 順序訪問執行緒  lock = new ReentrantLock(fair);  notEmpty = lock .newCondition();  notFull  lock .newCondition();     } public boolean offer(E e) {         checkNotNull(e);         //使用ReentrantLock 鎖機制  final ReentrantLock lock = this.lock;         lock.lock();//加鎖  try {  if (count == items.length)  return false ;  else {                 enqueue(e);  return true ;             }         } finally {             lock.unlock();//釋放鎖         }     } private void enqueue(E x) {  final Object[] items = this.items;         items[ putIndex] = x;//通過陣列進行儲存  if (++putIndex == items.length)  putIndex = 0;  count++;  notEmpty.signal();     } ……. } 使用例項是: import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /*
* 現有的程式程式碼模擬產生了16個日誌物件,並且需要執行16秒才能列印完這些日誌,
* 請在程式中增加4個執行緒去呼叫parseLog()方法來分頭列印這16個日誌物件,
* 程式只需要執行4秒即可列印完這些日誌物件。
*/ public class BlockingQueueTest {  public static void main(String[] args) throws Exception {  // 新建一個等待佇列  final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(16);  // 四個執行緒  for (int i = 0; i < 4; i++) {  new Thread(new Runnable() {  @Override  public void run() {  while (true ) {  try {                                           String log = (String) bq.take();                                            parseLog(log);                                     } catch (Exception e) {                                     }                               }                         }                   }).start();             }  for (int i = 0; i < 16; i++) {                   String log = (i + 1) + ” –>  “;                   bq.put(log); // 將資料存到佇列裡!             }       }  // parseLog方法內部的程式碼不能改動  public static void parseLog(String log) {             System. out.println(log + System.currentTimeMillis());  try {                   Thread. sleep(1000);             } catch (InterruptedException e) {                   e.printStackTrace();             }       } } 5.3 LinkedBlockingQueue       LinkedBlockingQueue : 基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列 中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時 (LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反 之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別 採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。

作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大 小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於 消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

先看一下LinkedBlockingDeque的部分原始碼:理解一下ArrayBlockingQueue的實現原理和機制 public class LinkedBlockingDeque <E>  extends AbstractQueue<E>  implements BlockingDeque<E>, java.io.Serializable {  final ReentrantLock lock = new ReentrantLock();//執行緒安全 /**      * @throws NullPointerException {@inheritDoc}      */  public boolean offerLast(E e) {  if (e == null) throw new NullPointerException();         Node<E> node = new Node<E>(e);//每次插入後都將動態地建立連結節點  final ReentrantLock lock = this.lock;         lock.lock();  try {  return linkLast(node);         } finally {             lock.unlock();         }     } public boolean offer(E e) {  return offerLast(e);     } public boolean add(E e) {         addLast(e);  return true ;     } public void addLast(E e) {  if (!offerLast(e))  throw new IllegalStateException(“Deque full”);     } public E removeFirst() {         E x = pollFirst();  if (x == null) throw new NoSuchElementException();  return x;     } public E pollFirst() {  final ReentrantLock lock = this.lock;         lock.lock();  try {  return unlinkFirst();         } finally {             lock.unlock();         }     } …… } 使用例項是: 將ArrayBlockingQueue的例子換成LinkedBlockingQueue即可:  // 新建一個等待佇列  final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(16); 換成: final BlockingQueue<String> bq = new LinkedBlockingQueue<String>(16);

5.4 PriorityBlockingQueue 

PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖。

先看一下PriorityBlockingQueue的部分原始碼,理解一下PriorityBlockingQueue的實現原理和機制:

public class PriorityBlockingQueue <E> extends AbstractQueue<E>  implements BlockingQueue<E>, java.io.Serializable {  private final ReentrantLock lock ;//說明本類使用一個lock來同步讀寫等操作  private transient Comparator<? super E> comparator;      // 使用指定的初始容量建立一個 PriorityBlockingQueue,並根據指定的比較器對其元素進行排序。  public PriorityBlockingQueue( int initialCapacity,                                  Comparator<? super E> comparator) {  if (initialCapacity < 1)  throw new IllegalArgumentException();  this.lock = new ReentrantLock();  this.notEmpty = lock.newCondition();  this.comparator = comparator;  this.queue = new Object[initialCapacity];     }      public E poll() {  final ReentrantLock lock = this.lock;         lock.lock();  try {  return dequeue();         } finally {             lock.unlock();         }     } …… }

5.5 DelayQueue

     DelayQueue:是一個支援延時獲取元素的使用優先順序佇列的實現的無界阻塞佇列。佇列中的元素必須實現Delayed介面和Comparable介面,也就是說DelayQueue裡面的元素必須有public int compareTo( T o)和long getDelay(TimeUnit unit)方法存在,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。我們可以將DelayQueue運用在以下應用場景:
  • 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
  • 定時任務排程。使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
我們來看一下DelayQueue的原始碼來理解一下: //可以看出來E元素必須繼承Delayed和而Delayed又繼承Comparable; public class DelayQueue<E extends Delayed> extends AbstractQueue<E>  implements BlockingQueue<E> {  private final transient ReentrantLock lock = new ReentrantLock();//安全鎖機制  private final PriorityQueue<E> q = new PriorityQueue<E>();//PriorityQueue來存取元素 public E take() throws InterruptedException {  final ReentrantLock lock = this.lock;         lock.lockInterruptibly();  try {  for (;;) {                 E first = q.peek();  if (first == null)  available.await();  else {                     //根據元素的Delay進行判斷  long delay = first.getDelay(NANOSECONDS);  if (delay <= 0)  return q .poll();                     first = null; // don’t retain ref while waiting  if (leader != null)                        //沒到時間阻塞等待  available.await();  else {                         Thread thisThread = Thread. currentThread();  leader = thisThread;  try {  available.awaitNanos(delay);                         } finally {  if (leader == thisThread)  leader = null ;                         }                     }                 }             }         } finally {  if (leader == null && q.peek() != null)  available.signal();             lock.unlock();         }     } …… } 我們來看一下DelayQueue的使用例項: (1)實現一個Student物件作為DelayQueue的元素必須實現Delayed 介面的兩個方法; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Student implements Delayed { //必須實現Delayed介面  private String name ;  private long submitTime ;// 交卷時間  private long workTime ;// 考試時間  public String getName() {  return this .name + ” 交卷,用時” + workTime;       }  public Student(String name, long sub