定製併發類(十)實現一個基於優先順序的傳輸佇列
宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 譯者:鄭玉婷
實現一個基於優先順序的傳輸佇列
Java 7 API 提供幾種與併發應用相關的資料型別。從這裡面,我們想來重點介紹以下2種資料型別:
- LinkedTransferQueue:這個資料型別支援那些有生產者和消費者結構的程式。 在那些應用,你有一個或者多個數據生產者,一個或多個數據消費者和一個被生產者和消費者共享的資料型別。生產者把資料放入資料結構內,然後消費者從資料結構內提取資料。如果資料結構為空,消費者會被阻塞直到有資料可以消費。如果資料結構滿了,生產者就會被阻塞直到有空位來放資料。
- PriorityBlockingQueue:在這個資料結構,元素是按照順序儲存的。元素們必須實現 帶有 compareTo() 方法的 Comparable 介面。當你在結構中插入資料時,它會與資料元素對比直到找到它的位置。
LinkedTransferQueue 的元素是按照抵達順序儲存的,所以越早到的越先被消耗。你有可能需要開發 producer/ consumer 程式,它的消耗順序是由優先順序決定的而不是抵達時間。在這個指南,你將學習如何實現在 producer/ consumer 問題中使用的資料結構,這些元素將被按照他們的優先順序排序,級別高的會先被消耗。
準備
指南中的例子是使用Eclipse IDE 來實現的。如果你使用Eclipse 或者其他的IDE,例如NetBeans,開啟並建立一個新的java任務。
怎麼做呢…
按照這些步驟來實現下面的例子::
//1. 建立一個類,名為 MyPriorityTransferQueue,擴充套件 PriorityBlockingQueue 類並實現 TransferQueue 介面。 public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> { //2. 宣告一個私有 AtomicInteger 屬性,名為 counter,用來儲存正在等待元素的消費者的數量。 private AtomicInteger counter; //3. 宣告一個私有 LinkedBlockingQueue 屬性,名為 transferred。 private LinkedBlockingQueue<E> transfered; //4. 宣告一個私有 ReentrantLock 屬性,名為 lock。 private ReentrantLock lock; //5. 實現類的建構函式,初始化它的屬性值。 public MyPriorityTransferQueue() { counter=new AtomicInteger(0); lock=new ReentrantLock(); transfered=new LinkedBlockingQueue<E>(); } //6. 實現 tryTransfer() 方法。此方法嘗試立刻傳送元素給正在等待的消費者(如果可能)。如果沒有任何消費者在等待,此方法返回 false 值。 @Override public boolean tryTransfer(E e) { lock.lock(); boolean value; if (counter.get()==0) { value=false; } else { put(e); value=true; } lock.unlock(); return value; } //7. 實現 transfer() 方法。此方法嘗試立刻傳送元素給正在等待的消費者(如果可能)。如果沒有任何消費者在等待, 此方法把元素存入一個特殊queue,為了傳送給第一個嘗試獲取一個元素的消費者並阻塞執行緒直到元素被消耗。 @Override public void transfer(E e) throws InterruptedException { lock.lock(); if (counter.get()!=0) { put(e); lock.unlock(); } else { transfered.add(e); lock.unlock(); synchronized (e) { e.wait(); } } } //8. 實現 tryTransfer() 方法,它接收3個引數: 元素,和需要等待消費者的時間(如果沒有消費者的話),和用來註明時間的單位。如果有消費者在等待,立刻傳送元素。否則,轉化時間到毫秒並使用 wait() 方法讓執行緒進入休眠。當消費者取走元素時,如果執行緒在 wait() 方法裡休眠,你將使用 notify() 方法喚醒它。 @Override public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); if (counter.get()!=0) { put(e); lock.unlock(); return true; } else { transfered.add(e); long newTimeout= TimeUnit.MILLISECONDS.convert(timeout, unit); lock.unlock(); e.wait(newTimeout); lock.lock(); if (transfered.contains(e)) { transfered.remove(e); lock.unlock(); return false; } else { lock.unlock(); return true; } } } //9. 實現 hasWaitingConsumer() 方法。使用 counter 屬性值來計算此方法的返回值。如果counter 的值大於0,放回 true。不然,返回 false。 @Override public boolean hasWaitingConsumer() { return (counter.get()!=0); } //10. 實現 getWaitingConsumerCount() 方法。返回counter 屬性值。 @Override public int getWaitingConsumerCount() { return counter.get(); } //11.實現 take() 方法。此方法是當消費者需要元素時被消費者呼叫的。首先,獲取之前定義的鎖並增加在等待的消費者數量。 @Override public E take() throws InterruptedException { lock.lock(); counter.incrementAndGet(); //12.如果在 transferred queue 中無任何元素。釋放鎖並使用 take() 方法嘗試從queue中獲取元素,此方法將讓執行緒進入睡眠直到有元素可以消耗。 E value=transfered.poll(); if (value==null) { lock.unlock(); value=super.take(); lock.lock(); //13. 否則,從transferred queue 中取走元素並喚醒正在等待要消耗元素的執行緒(如果有的話)。 } else { synchronized (value) { value.notify(); } } //14. 最後,增加正在等待的消費者的數量並釋放鎖。 counter.decrementAndGet(); lock.unlock(); return value; } //15. 實現一個類,名為 Event,擴充套件 Comparable 介面,把 Event 類引數化。 public class Event implements Comparable<Event> { //16. 宣告一個私有 String 屬性,名為 thread,用來儲存建立事件的執行緒的名字。 private String thread; //17. 宣告一個私有 int 屬性,名為 priority,用來儲存事件的優先順序。 private int priority; //18. 實現類的建構函式,初始化它的屬性值。 public Event(String thread, int priority){ this.thread=thread; this.priority=priority; } //19. 實現一個方法,返回 thread 屬性值。 public String getThread() { return thread; } //20. 實現一個方法,返回 priority 屬性值。 public int getPriority() { return priority; } //21. 實現 compareTo() 方法。此方法把當前事件與接收到的引數事件進行對比。返回 -1,如果當前事件的優先順序的級別高於引數;返回 1,如果當前事件的優先順序低於引數;如果相等,則返回 0。你將獲得一個按優先順序遞減順序排列的list。有高等級的事件就會被排到queue的最前面。 public int compareTo(Event e) { if (this.priority>e.getPriority()) { return -1; } else if (this.priority<e.getPriority()) { return 1; } else { return 0; } } //22. 實現一個類,名為 Producer,它實現 Runnable 介面。 public class Producer implements Runnable { //23. 宣告一個私有 MyPriorityTransferQueue 屬性,接收引數化的 Event 類屬性,名為 buffer,用來儲存這個生產者生成的事件。 private MyPriorityTransferQueue<Event> buffer; //24. 實現類的建構函式,初始化它的屬性值。 public Producer(MyPriorityTransferQueue<Event> buffer) { this.buffer=buffer; } //25. 這個類的實現 run() 方法。建立 100 個 Event 物件,用他們被建立的順序決定優先順序(越先建立的優先順序越高)並使用 put() 方法把他們插入queue中。 public void run() { for (int i=0; i<100; i++) { Event event=new Event(Thread.currentThread().getName(),i); buffer.put(event); } } //26. 實現一個類,名為 Consumer,它要實現 Runnable 介面。 public class Consumer implements Runnable { //27. 宣告一個私有 MyPriorityTransferQueue 屬性,引數化 Event 類屬性,名為 buffer,用來獲取這個類的事件消費者。 private MyPriorityTransferQueue<Event> buffer; //28. 實現類的建構函式,初始化它的屬性值。 public Consumer(MyPriorityTransferQueue<Event> buffer) { this.buffer=buffer; } //29. 實現 run() 方法。它使用 take() 方法消耗1002 Events (這個例子實現的全部事件)並把生成事件的執行緒數量和它的優先級別寫入操控臺。 @Override public void run() { for (int i=0; i<1002; i++) { try { Event value=buffer.take(); System.out.printf("Consumer: %s: %d\n",value. getThread(),value.getPriority()); } catch (InterruptedException e) { e.printStackTrace(); } } } //30. 建立例子的主類通過建立一個類,名為 Main 並新增 main()方法。 public class Main { public static void main(String[] args) throws Exception { //31. 建立一個 MyPriorityTransferQueue 物件,名為 buffer。 MyPriorityTransferQueue<Event> buffer=new MyPriorityTransferQu eue<Event>(); //32. 建立一個 Producer 任務並執行 10 執行緒來執行任務。 Producer producer=new Producer(buffer); Thread producerThreads[]=new Thread[10]; for (int i=0; i<producerThreads.length; i++) { producerThreads[i]=new Thread(producer); producerThreads[i].start(); } //33.建立並執行一個 Consumer 任務。 Consumer consumer=new Consumer(buffer); Thread consumerThread=new Thread(consumer); consumerThread.start(); //34. 寫入當前的消費者數量。 System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount()); //35. 使用 transfer() 方法傳輸一個事件給消費者。 Event myEvent=new Event("Core Event",0); buffer.transfer(myEvent); System.out.printf("Main: My Event has ben transfered.\n"); //36. 使用 join() 方法等待生產者的完結。 for (int i=0; i<producerThreads.length; i++) { try { producerThreads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } //37. 讓執行緒休眠1秒。 TimeUnit.SECONDS.sleep(1); //38.寫入當前的消費者數量。 System.out.printf("Main: Buffer: Consumer count: %d\n",buffer. getWaitingConsumerCount()); //39. 使用 transfer() 方法傳輸另一個事件。 myEvent=new Event("Core Event 2",0); buffer.transfer(myEvent); //40. 使用 join() 方法等待消費者完結。 consumerThread.join(); //41. 寫資訊表明程式結束。 System.out.printf("Main: End of the program\n");
它是怎麼工作的…
在這個指南,你已經實現了 MyPriorityTransferQueue 資料結構。這個資料型別是在 producer/consumer 問題中使用的,它的元素是按照優先順序排列的。由於 Java 不支援多個繼承,所以你首先要決定的是 MyPriorityTransferQueue 類的基類。你擴充套件了 PriorityBlockingQueue 類,來實現在結構中插入資料按照優先順序排序。你也實現了 TransferQueue 介面,添加了與 producer/consumer 相關的3個方法。
MyPriortyTransferQueue 類有以下2個屬性:
- AtomicInteger 屬性,名為 counter: 此屬性儲存了正在等待從資料型別提取元素的消費者的數量。當一個消費者呼叫 take()操作來從資料型別中提取元素時,counter 數增加。當消費者結束 take() 操作的執行時,counter 數再次增加。在 hasWaitingConsumer() 和 getWaitingConsumerCount() 方法的實現中使用到了 counter。
- ReentrantLock 屬性,名為 lock: 此屬性是用來控制訪問已實現的操作。只有一個執行緒可以用資料型別。最後一個,LinkedBlockingQueue list 用來儲存傳輸的元素。
在 MyPriorityTransferQueue 中,你實現了一些方法。全部方法都在 TransferQueue 介面中聲明瞭和在PriorityBlockingQueue 介面實現的 take() 方法。在之前已經描述了2個方法了。來看看剩下的方法的描述:
- tryTransfer(E e): 此方法嘗試直接傳送元素給消費者。如果有消費者在等待,此方法儲存元素到 priority queue 中為了立刻提供給消費者,並返回 true 值。如果沒有消費者在等待,方法返回 false 值。
- transfer(E e): 此方法直接傳送元素給消費者。如果有消費者在等待,此方法儲存元素到 priority queue 中為了立刻提供給消費者。
否則,把元素儲存到已傳輸的元素list 並阻塞執行緒直到元素被消耗。當執行緒進入休眠時,你要釋放鎖,如果不的話,你就阻塞了queue。
- tryTransfer(E e, long timeout, TimeUnit unit): 此方法與 transfer() 方法相似,只是它的執行緒被阻塞的時間段是由引數決定的。當執行緒進入休眠時,你要釋放鎖,如果不的話,你就阻塞了queue。
- take(): 此方法返回下一個要被消耗的元素。如果在 transferred 元素list中有元素,就從list中取走元素。否則,就從 priority queue 中取元素。
一旦你實現了資料型別,你就實現了 Event 類。它就是在資料型別裡儲存的元素構成的類。Event 類有2個屬性用來儲存生產者的ID和事件的優先順序,並實現了 Comparable 介面,為了滿足你的資料型別的需要。
接著,你實現了 Producer 和 Consumer 類。在這個例子中,你有 10 個生產者和一個消費者,他們共享同一個 buffer。每個生產者生成100個事件,他們的優先順序是遞增的, 所以有高優先順序的事件在越後面才生成。
例子的主類建立了一個 MyPriorityTransferQueue 物件,10個生產者,和一個消費者,然後使用MyPriorityTransferQueue buffer 的 transfer() 方法來傳輸2個事件到 buffer。
以下截圖是程式執行的部分輸出:
你可以發現有著高級別的事件如何先被消費,和一個消費者如何消費傳輸的事件。