1. 程式人生 > >09 佇列:佇列queue線上程池等有限資源中的應用

09 佇列:佇列queue線上程池等有限資源中的應用

總結:佇列最大的特點就是先進先出,主要的兩個操作是入隊和出隊。跟棧一樣,它即可以用陣列來實現,也可以用連結串列來實現。用陣列實現的叫順序佇列,用連結串列是實現的叫鏈式佇列。特別是長的像個環的迴圈佇列。在陣列實現佇列的時候,會有陣列搬移操作,要解決資料搬移的問題,就需要迴圈佇列。
迴圈佇列是重點,要想寫出沒有bug的迴圈佇列實現程式碼,關鍵是確定好隊空和隊滿的判定程式碼,要求會手寫迴圈佇列程式碼
還講了集中高階的佇列結構,阻塞佇列、併發佇列,底層都還是佇列這種資料結構,只是附加了功能。阻塞佇列就是入隊、出隊操作可以阻塞,併發佇列就是佇列的操作多執行緒安全。

CPU資源是有限的,任務的處理速度與執行緒個數並不是線性正相關。相反,過多的執行緒反而會導致CPU頻繁切換,處理效能下降。所以,執行緒池的大熊啊一般都是綜合考慮處理任務的特點和硬體環境,來事先設定的。

當向固定大小的執行緒池中請求一個執行緒時,如果執行緒池中沒有空閒資源了,這個時候執行緒池如何處理這個請求?時拒絕請求還是排隊請求?各種處理策略又是怎麼實現的?

一、如何理解“佇列” FIFO
支援的基本操作:入隊 enqueue(),放一個數據到佇列尾部;出隊dequeue(),從佇列頭部取一個元素;
佇列與棧一樣,也是一種操作受限的線性表資料結構。
作為一種非常基礎的資料結構,佇列的應用也非常廣泛,特別是一些具有某些額外特性的佇列,比如迴圈佇列、阻塞佇列、併發佇列。它們在很多偏底層系統、框架、中介軟體的開發中,起著關鍵性的作用。比如高效能佇列Disruptor\Linux環形快取,都用到了迴圈併發佇列;Java concurrent 併發包利用ArrayBlockingQueue實現公平鎖等。

二、順序佇列和鏈式佇列

// 用陣列實現的佇列
public class ArrayQueue {
  // 陣列:items,陣列大小:n
  private String[] items;
  private int n = 0;
  // head 表示隊頭下標,tail 表示隊尾下標
  private int head = 0;
  private int tail = 0;

  // 申請一個大小為 capacity 的陣列
  public ArrayQueue(int capacity) {
    items = new String[capacity];
    n = capacity;
  }

  // 入隊
  public boolean enqueue(String item) {
    // 如果 tail == n 表示佇列已經滿了
    if (tail == n) return false;
    items[tail] = item;
    ++tail;
    return true;
  }

  // 出隊
  public String dequeue() {
    // 如果 head == tail 表示佇列為空
    if (head == tail) return null;
    // 為了讓其他語言的同學看的更加明確,把 -- 操作放到單獨一行來寫了
    String ret = items[head];
    ++head;
    return ret;
  }
}

佇列需要兩個指標:一個是head指標,指向隊頭;一個是tail指標,指向隊尾。

隨著不停地進行入隊、出隊操作,head和tail都會持續往後移動。當tail移動到最右邊,即使陣列中還有空閒空間,也無法繼續往佇列中新增資料了。

聯想陣列的刪除操作會導致陣列中的資料不連續,當時的解決方式是:資料搬移、但是,每次進行出隊操作都相當於刪除下標為0的資料,要搬移整個佇列中的資料,這樣出隊操作的時間複雜度就會從原來的O(1)變為O(n),如何優化?
實際上,在出隊時可以不用搬移資料。如果滅有空閒空間了,只需要在入隊時,再集中出發一次資料的搬移操作。
  

 // 入隊操作,將 item 放入隊尾
  public boolean enqueue(String item) {
    // tail == n 表示佇列末尾沒有空間了
    if (tail == n) {
      // tail ==n && head==0,表示整個佇列都佔滿了
      if (head == 0) return false;
      // 資料搬移
      for (int i = head; i < tail; ++i) {
        items[i-head] = items[i];
      }
      // 搬移完之後重新更新 head 和 tail
      tail -= head;
      head = 0;
    }
    
    items[tail] = item;
    ++tail;
    return true;
  }

基於連結串列的佇列實現方法:
同樣需要兩個指標:head指標和tail指標。它們分別指向連結串列的第一個節點和最後一個節點。入隊時,tail->next = new_node,tail = tail->next;出隊時,head = head->next.

/**
 * 基於連結串列實現的佇列
 *
 * Author: Zheng
 */
public class QueueBasedOnLinkedList {

  // 佇列的隊首和隊尾
  private Node head = null;
  private Node tail = null;

  // 入隊
  public void enqueue(String value) {
    if (tail == null) {
      Node newNode = new Node(value, null);
      head = newNode;
      tail = newNode;
    } else {
      tail.next = new Node(value, null);
      tail = tail.next;
    }
  }

  // 出隊
  public String dequeue() {
    if (head == null) return null;

    String value = head.data;
    head = head.next;
    if (head == null) {
      tail = null;
    }
    return value;
  }

  public void printAll() {
    Node p = head;
    while (p != null) {
      System.out.print(p.data + " ");
      p = p.next;
    }
    System.out.println();
  }

  private static class Node {
    private String data;
    private Node next;

    public Node(String data, Node next) {
      this.data = data;
      this.next = next;
    }

    public String getData() {
      return data;
    }
  }

}

三、迴圈佇列
使用資料來實現佇列的時候,在tail==n時,會有資料搬移操作,這樣入隊操作效能就會收到影響->迴圈佇列;
如佇列大小為8,當前head=4,tail = 7。當有一個新的元素a入隊時,我們放入下標為7的位置。當這個時候,我們並不把tail更新為8,而是將其在環中後移一位,到下標為0的位置。當再有一個元素b入隊時,我們將b放入下標為0的位置,然後tail+1 更新為1.所以,在a,b依次入隊之後

這樣避免了資料搬移操作。看起來不難理解,但是迴圈佇列的程式碼實現難度要比非迴圈佇列難多了。要想寫出沒有bug的迴圈佇列的實現程式碼,最關鍵的時,確定好隊空和隊滿的判定條件。

在用陣列實現的非迴圈佇列中,隊滿的判斷條件是tail==n,隊空的判斷條件是head==tail。針對迴圈佇列,如何判斷隊空和隊滿呢?
佇列為空的判斷條件仍然是head==tail.但佇列滿的判斷條件就複雜了

如圖,tail=3,head=4,n=8,(3+1)%8=4. 即(tail+1)%n = head;

注意,當佇列滿時,途中的tail指向的位置實際上是沒有儲存資料的。所以,迴圈佇列會浪費一個數組的儲存空間。

public class CircularQueue {
  // 陣列:items,陣列大小:n
  private String[] items;
  private int n = 0;
  // head 表示隊頭下標,tail 表示隊尾下標
  private int head = 0;
  private int tail = 0;

  // 申請一個大小為 capacity 的陣列
  public CircularQueue(int capacity) {
    items = new String[capacity];
    n = capacity;
  }

  // 入隊
  public boolean enqueue(String item) {
    // 佇列滿了
    if ((tail + 1) % n == head) return false;
    items[tail] = item;
    tail = (tail + 1) % n;
    return true;
  }

  // 出隊
  public String dequeue() {
    // 如果 head == tail 表示佇列為空
    if (head == tail) return null;
    String ret = items[head];
    head = (head + 1) % n;
    return ret;
  }
}

四、阻塞佇列和併發佇列
阻塞佇列就是在佇列基礎上增加了阻塞操作。就是在佇列為空的時候,從隊頭取資料會被阻塞。因為此時還沒有資料可取,直到佇列中有了資料才能返回;如果佇列已經滿了,那麼插入資料的操作就會被阻塞,直到佇列中有空閒位置後再插入資料,然後在返回。---“生產者-消費者模型”

這種基於阻塞佇列實現的“生產者-消費者模型”可以有效地協調生產和消費的速度。當“生產者”生產資料的速度過快,“消費者”來不及消費時,儲存資料的佇列很快就滿了。這時,生產者就阻塞等待,直到“消費者”消費了資料,“生產者”才會被喚醒繼續“生產”。
而且不僅如此,基於阻塞佇列,我們可以通過協調“生產者”和“消費者”個數,來提高資料的資料效率。


阻塞佇列在多執行緒情況下,會有多個執行緒同時操作佇列,這時會存線上程安全問題,如何實現一個執行緒安全的佇列呢?
執行緒安全的佇列也叫併發佇列。最簡單的是加鎖,但是鎖粒度大併發度會比較低,同一時刻僅允許一個存貨取操作。實際上,基於陣列的迴圈佇列,利用CAS原子操作,可以實現高效的併發佇列。這也是迴圈佇列比鏈式佇列應用更加廣泛的原因。在實戰篇講Disruptor時,詳細講併發佇列的應用。

五、解答開篇
當向固定大小的執行緒池中請求一個執行緒時,如果執行緒池中沒有空閒資源了,這個時候執行緒池如何處理這個請求?是拒絕請求還是排隊請求?各種處理策略又是怎麼實現的?

一般有兩種處理策略。一:非阻塞的處理方式,直接拒絕任務請求;另一種是阻塞的處理方式,將請求排隊,等到有空閒執行緒時,取出排隊的請求繼續處理。那如何儲存排隊的請求呢?
我們希望公平的處理每個排隊的請求FIFO。佇列有基於連結串列和基於陣列兩種實現方式。者兩種實現方式對排隊請求又有什麼區別呢?
基於連結串列的實現方式,可以實現一個支援無限排隊的無界佇列unbounded queue,但是可能會導致過多的請求排隊等待,請求處理的響應時間過長。所以,針對響應時間比較敏感的系統,基於連結串列實現的無限排隊的執行緒池時不合適的。

基於陣列實現的有界佇列bounded queue,佇列的大小有限,所以執行緒池中排隊的請求超過佇列大小時,接下來請求就會被拒絕,這種方式對響應時間敏感的系統來說,更加合理。不過,設定一個合理的佇列大小,也是非常有講究的。佇列太大導致等待的請求太多,佇列太小會導致無法充分利用系統資源、發揮最大效能。

除了前面講到佇列應用線上程池請求排隊的場景之外,佇列可以應用在任何有限資源池中,用於排隊請求,比如資料庫連線池等。實際上,對於大部分資源有限的場景,當沒有空閒資源時,基本上都可以通過“佇列”這種資料結構來請求排隊。