生產者消費者模式-Java實現
感知階段
隨著軟體業的發展,網際網路使用者的日漸增多,併發這門藝術的興起似乎是那麼合情合理。每日PV十多億的淘寶,處理併發的手段可謂是業界一流。使用者訪問淘寶首頁的平均等待時間只有區區幾秒,但是伺服器所處理的流程十分複雜。首先負責首頁的伺服器就有好幾千臺,通過計算把與使用者路由最近的伺服器處理首頁的返回。其次是網頁上的資源,就JS和CSS檔案就有上百個,還有圖片資源等。它能在幾秒內加載出來可見阿里幾千名頂尖工程師的智慧是如何登峰造極。
而在大型電商網站中,他們的服務或者應用解耦之後,是通過訊息佇列在彼此間通訊的。訊息佇列和應用之間的架構關係就是生產者消費者模型。
在介紹之前,先找找現實間的模型。筆者最近發覺,很多技術模型是和生活中的模型息息相關的。相信多數人都進過肯德基和麥當勞消費,筆者進店消費的時候發現他們的點單流程和併發模型十分接近。雖然每家店的流程有所差異,但是大概就只有兩種模型。在肯德基裡,你點單之後點單員會把所點的食物完成封裝之後拿來你面前,然後讓你結賬,有時候有些耗時操作沒完成就會留下一個餐檯號稍後送來。而在麥當勞的點餐模型大致是,你點完快餐之後要求你立即付款,付完款之後下一位點餐,而取餐的是在旁邊等待,另一個服務員專責負責配餐。
肯德基流程
麥當勞點餐圖
在併發模型中,肯德基比較傾向於一個執行緒把所有的服務都做完,而麥當勞傾向於服務解耦,讓他們更專注於自己的業務。而肯德基的模型與BIO伺服器的模型設計類似,麥當勞的模型則與生產者消費者模型十分相似。
- 生產消費者模型
生產者消費者模型具體來講,就是在一個系統中,存在生產者和消費者兩種角色,他們通過記憶體緩衝區進行通訊,生產者生產消費者需要的資料,消費者把資料做成產品。生產消費者模式如下圖。
在日益發展的服務型別中,譬如註冊使用者這種服務,它可能解耦成好幾種獨立的服務(賬號驗證,郵箱驗證碼,手機簡訊碼等)。它們作為消費者,等待使用者輸入資料,在前臺資料提交之後會經過分解併發送到各個服務所在的url,分發的那個角色就相當於生產者。消費者在獲取資料時候有可能一次不能處理完,那麼它們各自有一個請求佇列,那就是記憶體緩衝區了。做這項工作的框架叫做訊息佇列。
- 生產者消費者模型的實現
生產者是一堆執行緒,消費者是另一堆執行緒,記憶體緩衝區可以使用List陣列佇列,資料型別只需要定義一個簡單的類就好。關鍵是如何處理多執行緒之間的協作。這其實也是多執行緒通訊的一個範例。
在這個模型中,最關鍵就是記憶體緩衝區為空的時候消費者必須等待,而記憶體緩衝區滿的時候,生產者必須等待。其他時候可以是個動態平衡。值得注意的是多執行緒對臨界區資源的操作時候必須保證在讀寫中只能存在一個執行緒,所以需要設計鎖的策略。
下面這個例子是書上介紹的,生產者負責生產一個數字並存入緩衝區,消費者從緩衝區中取出資料並且求出它的平方並輸出。
/** * 生產者 * @author ctk * 生產者消費者模型 */ public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<PCData> queue;// 記憶體緩衝區 private static AtomicInteger count = new AtomicInteger();// 總數 原子操作 private static final int SLEEPTIME = 1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data = null; Random r = new Random(); System.out.println("start producting id:" + Thread.currentThread().getId()); try { while (isRunning) { Thread.sleep(r.nextInt(SLEEPTIME)); data = new PCData(count.incrementAndGet()); System.out.println(data + " 加入佇列"); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.err.println(" 加入佇列失敗"); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop() { isRunning = false; } }
package ProducterAndConsumer.Version1; /** * 消費者 * @author ctk */ import java.text.MessageFormat; import java.util.Random; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable{ private BlockingQueue<PCData> queue; private static final int SLEEPTIME = 1000; public Consumer(BlockingQueue<PCData> queue){ this.queue = queue; } @Override public void run() { System.out.println("start Consumer id :"+Thread.currentThread().getId()); Random r = new Random(); try{ while(true){ PCData data = queue.take(); if(data != null) { int re = data.getData() * data.getData(); System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re)); Thread.sleep(r.nextInt(SLEEPTIME)); } } }catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
package ProducterAndConsumer.Version1; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; /** * 主函式 * @author ctk * */ public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10); Producer p1 = new Producer(queue); Producer p2 = new Producer(queue); Producer p3 = new Producer(queue); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(c1); service.execute(c2); service.execute(c3); Thread.sleep(10*1000); p1.stop(); p2.stop(); p3.stop(); Thread.sleep(3000); service.shutdown(); } }
package ProducterAndConsumer.Version1; /** * 容器資料型別 * @author ctk * */ public class PCData { private final int intData; public PCData(int d){ intData = d; } public PCData(String d){ intData = Integer.valueOf(d); } public int getData(){ return intData; } @Override public String toString(){ return "data:"+intData; } }
因為BlockingQueue是一個阻塞佇列,它的存取可以保證只有一個執行緒在進行,所以根據邏輯,生產者在記憶體滿的時候進行等待,並且喚醒消費者佇列,反過來消費者在飢餓狀態下等待並喚醒生產者進行生產。
下面的兩個版本是使用notify/wait()和await()/signal()方法進行設計的。在結構上是一致遵從模型圖的。
package ProducterAndConsumer.Version2; import java.util.List; /** * 消費者 * * @author ctk * */ public class Consumer implements Runnable { private List<PCData> queue; public Consumer(List<PCData> queue) { this.queue = queue; } @Override public void run() { try { while (true) { if (Thread.currentThread().isInterrupted()) break; PCData data = null; synchronized (queue) { if (queue.size() == 0) { queue.notifyAll(); queue.wait(); } data = queue.remove(0); } System.out.println( Thread.currentThread().getId() + " 消費了:" + data.get() + " result:" + (data.get() * data.get())); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } }
package ProducterAndConsumer.Version2; import java.util.List; import java.util.Random; /** * 生產者 * * @author MacBook * */ public class Producer implements Runnable { private List<PCData> queue; private int length; public Producer(List<PCData> queue, int length) { this.queue = queue; this.length = length; } @Override public void run() { try { while (true) { if (Thread.currentThread().isInterrupted()) break; Random r = new Random(); long temp = r.nextInt(100); System.out.println(Thread.currentThread().getId() + " 生產了:" + temp); PCData data = new PCData(); data.set(temp); synchronized (queue) { if (queue.size() >= length) { queue.notifyAll(); queue.wait(); } else queue.add(data); } Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } }
package ProducterAndConsumer.Version2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { List<PCData> queue = new ArrayList<PCData>(); int length = 10; Producer p1 = new Producer(queue,length); Producer p2 = new Producer(queue,length); Producer p3 = new Producer(queue,length); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(c1); service.execute(c2); service.execute(c3); } }
package ProducterAndConsumer.Version2; /** * 基本資料型別 * @author ctk * */ public class PCData { private long value; public void set(long value){ this.value = value; } public long get(){ return value; } }
package ProducterAndConsumer.Version3; import java.util.List; /** * 消費者 * @author ctk * */ public class Consumer implements Runnable{ private List<PCData> queue; public Consumer(List<PCData> queue){ this.queue = queue; } @Override public void run() { try { while (true) { if (Thread.currentThread().isInterrupted()) break; PCData data = null; Main.lock.lock(); if (queue.size() == 0){ Main.full.signalAll(); Main.empty.await(); } Thread.sleep(1000); data = queue.remove(0); Main.lock.unlock(); System.out.println("消費者ID:"+Thread.currentThread().getId()+" 消費了:"+data.getData()+" result:"+(data.getData()*data.getData())); } } catch (InterruptedException e) { e.printStackTrace(); } } }
package ProducterAndConsumer.Version3; import java.util.List; import java.util.Random; /** * 生產者 * @author ctk * */ public class Producter implements Runnable{ private List<PCData> queue; private int len; public Producter(List<PCData> queue,int len){ this.queue = queue; this.len = len; } @Override public void run() { try{ while(true){ if(Thread.currentThread().isInterrupted()) break; Random r = new Random(); PCData data = new PCData(); data.setData(r.nextInt(500)); Main.lock.lock(); if(queue.size() >= len) { Main.empty.signalAll(); Main.full.await(); } Thread.sleep(1000); queue.add(data); Main.lock.unlock(); System.out.println("生產者ID:"+Thread.currentThread().getId()+" 生產了:"+data.getData()); } }catch (InterruptedException e) { e.printStackTrace(); } } }
package ProducterAndConsumer.Version3; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Main { public static ReentrantLock lock = new ReentrantLock(); public static Condition empty = lock.newCondition(); public static Condition full = lock.newCondition(); public static void main(String[] args) { List<PCData> queue = new ArrayList<PCData>(); int length = 10; Producter p1 = new Producter(queue,length); Producter p2 = new Producter(queue,length); Producter p3 = new Producter(queue,length); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(c1); service.execute(c2); service.execute(c3); } }
package ProducterAndConsumer.Version3; public class PCData { private int data; public int getData() { return data; } public void setData(int data) { this.data = data; } }
await的版本我個人寫出來之後感覺,每次控制檯只輸出了一句話,說明在同一時間內生產者或者消費者只有一個是啟用的,而wait的版本,一次可能有多個生成者啟用。我個人覺得wait的版本更接近我的構想。
- 生產消費者模型思維
下午翻書,偶然發現平行計算的流水線思維。平行計算的要點就是分治法思維,如果能證明分割的兩部分在因果上沒有關聯,則可以進行平行計算。譬如書上的例子(A+B)*C,這個算式是不能使用平行計算分割的,因為它的結果是A+B之後的結果乘以C。但是並行流水線的思維是,我們可以請兩個工人,每個工人負責一步的處理。
分解後的架構是:P1:D = A + B;P2:R = D*3;
在這兩個執行緒處理中並不需要存在因果,所以他們可以平行計算了。
設計這個模式是基於生產消費者模型的,流水線需要使用流水線傳遞半成品,流水線就是記憶體緩衝區,對於P2來說,P1就是生產者,而對於系統需要的結果來說,P2就是生產者。
- 後記
偶然讀到一本書,上面提到的建立高速公路的學習方法是十分高效的學習方法,在學習新的技術的時候它們或多或少都會在現實中有所對映,所以讀萬卷書行萬里路,經歷和學術需要並行增長。技術模型不僅應用在技術領域,管理領域也可以參照思考,learn more,study less。