Java 多執行緒 生產者和消費者 佇列
阿新 • • 發佈:2019-01-10
wait()和notifyAll()方法以一種非常低階的方式解決了任務互操作問題,即每次互動時都握手。在許多情況下,你可以瞄向更高的抽象級別,使用同步佇列來解決任務協作問題,同步佇列在任何時刻都只允許一個任務插入或移除元素。在java.util.concurrent.BlockingQueue介面中提供了這個佇列,這個介面有大量的標準實現。你通常可以使用LinkedBlockingQueue,它是一個無界佇列,還可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限數量的元素。
如果消費者任務試圖從佇列中獲取物件,而該佇列此時為空,那麼這些佇列還可以掛起消費者任務,並且當有更多的元素可用時回覆消費者任務。阻塞佇列可以解決非常大量的問題,而其方式與wait()和notifyAll()相比,則簡單並可靠得多。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * 阻塞佇列可以解決非常大量的問題,而其方式與wait()和notifyAll()相比,則簡單並可靠得多。 * 下面是一個簡單的測試,它將多個LiftOff物件的執行序列化了。消費者是LiftOffRunner,它將每個LiftOff物件從BlockingQueue * 中推出並直接執行。(即,它通過顯式地呼叫run()而使用自己的執行緒來執行,而不是為每個任務啟動一個新執行緒。) * * @create @author Henry @date 2016-12-23 * */ class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> rockets) { this.rockets = rockets; } public void add(LiftOff lo) { try { rockets.put(lo); } catch (InterruptedException e) { System.err.println("Interrupted during put()"); } } @Override public void run() { try { System.out.println("Coming in"); while (!Thread.interrupted()) { LiftOff rocket; rocket = rockets.take(); rocket.run(); } } catch (InterruptedException e) { System.err.println("Waking from take() ,InterruptedException"); } System.out.println("Exiting LiftOffRunner"); } } /** * * 簡單的物件實現Runnable介面 * * @create @author Henry @date 2016-11-16 */ class LiftOff implements Runnable { protected int countDown = 10; private static int taskCount = 0; private final int id = taskCount++; public LiftOff() { } public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown + "), ": "Liftoff),\n") ; } @Override public void run() { while (countDown-- > 0) { System.out.print(status()); /** * Thread.yield()的呼叫是對執行緒排程器(java執行緒機制的一部分,可以將CPU從一個執行緒轉移給另一個執行緒)的一種建議, * 它在宣告:“我已經執行完生命週期中最重要的部分了,此刻正是切換給其他任務執行一段時間的大好時機。” * 這完全是選擇性的,但是這裡使用它是因為它會在這些示例中產生更加有趣的輸出:你更有可能會看到任務換進換出的證據。 * */ Thread.yield(); } } } /** * 各個任務有main()放置到了BlockingQueue中,並且由LiftOffRunner從BlockingQueue中取出。注意,LiftOffRunner可以忽略 * 同步問題,因為它們已經由BlockingQueue解決了。 * * @create @author Henry @date 2016-12-23 * */ public class TestBlockingQueues { static void getkey() { try { // Compensate for Windows/Linux difference in the // length of the result produced by the Enter key; new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (IOException e) { throw new RuntimeException(e); } } static void getkey(String message) { System.out.println(message); getkey(); } static void test(String msg, BlockingQueue<LiftOff> queue) throws InterruptedException { System.out.println(msg); LiftOffRunner runner = new LiftOffRunner(queue); Thread t = new Thread(runner); t.start(); TimeUnit.SECONDS.sleep(2); for (int i = 0; i < 5; i++) runner.add(new LiftOff(5)); TimeUnit.SECONDS.sleep(3); getkey("Press 'Enter' (" + msg + ")"); t.interrupt(); System.out.println("Finished " + msg + " test"); } public static void main(String[] args) throws Exception { test("LinedBlockingQueue ",new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue ",new ArrayBlockingQueue<LiftOff>(3)); test("SynchronousQueue ",new SynchronousQueue<LiftOff>()); } }
import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * 吐司BlockingQueue * * 考慮下面這個使用BlockingQueue的示例,有一臺機器具有三個任務:一個製作吐司、一個給吐司抹黃油, * 另一個在抹過黃油的吐司上塗果醬。我們可以通過各個處理過程之間的BlockingQueue來執行這個吐司製作程式: * * @create @author Henry @date 2016-12-23 * */ class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; public Toast(int idn) { id = idn; } public void butter() { status = Status.BUTTERED; } public void jam() { status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } @Override public String toString() { return "Toast " + id + ": " + status; } } class ToastQueue extends LinkedBlockingQueue<Toast> { } class Toaster implements Runnable { private ToastQueue toashtQueue; private int count = 0; private Random rand = new Random(47); public Toaster(ToastQueue tq) { toashtQueue = tq; } @Override public void run() { try { while (!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500)); Toast t = new Toast(count++); System.out.println(t); toashtQueue.put(t); } } catch (InterruptedException e) { System.err.println("Toaster interrupted"); } System.out.println("Toaster off"); } } // Apply butter to toast; class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } @Override public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is availble; Toast t = dryQueue.take(); t.butter(); System.out.println(t); butteredQueue.put(t); } } catch (InterruptedException e) { System.err.println("Butterer interrupted"); } System.out.println("Butterer off"); } } // Apply jam to buttered toast; class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue buttered, ToastQueue finished) { butteredQueue = buttered; finishedQueue = finished; } @Override public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is available; Toast t = butteredQueue.take(); t.jam(); System.out.println(t); finishedQueue.put(t); } } catch (InterruptedException e) { System.err.println("Jammer Interrupted"); } System.out.println("Jammer off"); } } class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; public Eater(ToastQueue finished) { finishedQueue = finished; } @Override public void run() { try { while (!Thread.interrupted()) { // Blocks untill next piece of toast is available; Toast t = finishedQueue.take(); // Verify that the toast is coming in order. // and that all pieces are getting jammed; if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { System.out.println(">>> Error: " + t); System.exit(1); } else System.out.println("Chomp! " + t); } } catch (InterruptedException e) { System.err.println("Eater interrupted"); } System.out.println("Eater off"); } } /** * Toast 是一個使用enum值的優秀示例。注意,這個示例中沒有任何顯示的同步(即使用Lock物件,或synchronized 關鍵字的同步), * 因為同步佇列(其內部是同步的)和系統的設計隱式地管理了---每片Toast在任何時刻都由一個任務在操作。因為佇列的阻塞,使得 * 處理過程將被自動地掛起和恢復。你可以看到由BlockingQueue產生的簡化十分明顯。在使用顯式的wait()和notifyAll()時存在 * 的類和類之間的耦合被消除了,因為每個類都只和它的BlockingQueue通訊。 * * @create @author Henry @date 2016-12-23 * */ public class ToastOMatic { public static void main(String[] args) throws Exception { ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue(); ExecutorService exec=Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butterer(dryQueue,butteredQueue)); exec.execute(new Jammer(butteredQueue,finishedQueue)); exec.execute(new Eater(finishedQueue)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }