1. 程式人生 > >Java 多執行緒 生產者和消費者 佇列

Java 多執行緒 生產者和消費者 佇列

    wait()和notifyAll()方法以一種非常低階的方式解決了任務互操作問題,即每次互動時都握手。在許多情況下,你可以瞄向更高的抽象級別,使用同步佇列來解決任務協作問題,同步佇列在任何時刻都只允許一個任務插入或移除元素。在java.util.concurrent.BlockingQueue介面中提供了這個佇列,這個介面有大量的標準實現。你通常可以使用LinkedBlockingQueue,它是一個無界佇列,還可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限數量的元素。

    如果消費者任務試圖從佇列中獲取物件,而該佇列此時為空,那麼這些佇列還可以掛起消費者任務,並且當有更多的元素可用時回覆消費者任務。阻塞佇列可以解決非常大量的問題,而其方式與wait()和notifyAll()相比,則簡單並可靠得多。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * 阻塞佇列可以解決非常大量的問題,而其方式與wait()和notifyAll()相比,則簡單並可靠得多。
 * 下面是一個簡單的測試,它將多個LiftOff物件的執行序列化了。消費者是LiftOffRunner,它將每個LiftOff物件從BlockingQueue
 * 中推出並直接執行。(即,它通過顯式地呼叫run()而使用自己的執行緒來執行,而不是為每個任務啟動一個新執行緒。)
 * 
 * @create @author Henry @date 2016-12-23
 * 
 */
class LiftOffRunner implements Runnable {
	private BlockingQueue<LiftOff> rockets;

	public LiftOffRunner(BlockingQueue<LiftOff> rockets) {
		this.rockets = rockets;
	}

	public void add(LiftOff lo) {
		try {
			rockets.put(lo);
		} catch (InterruptedException e) {
			System.err.println("Interrupted during put()");
		}
	}

	@Override
	public void run() {
		try {
			System.out.println("Coming in");
			while (!Thread.interrupted()) {
				LiftOff rocket;
				rocket = rockets.take();
				rocket.run();
			}
		} catch (InterruptedException e) {
			System.err.println("Waking from take() ,InterruptedException");
		}
		System.out.println("Exiting LiftOffRunner");
	}
}

/**
 * 
 * 簡單的物件實現Runnable介面
 * 
 * @create @author Henry @date 2016-11-16
 */
class LiftOff implements Runnable {
	protected int countDown = 10;
	private static int taskCount = 0;
	private final int id = taskCount++;

	public LiftOff() {
	}

	public LiftOff(int countDown) {
		this.countDown = countDown;
	}

	public String status() {
		return "#" + id + "(" + (countDown > 0 ? countDown + "), ": "Liftoff),\n") ;
	}

	@Override
	public void run() {
		while (countDown-- > 0) {
			System.out.print(status());
			/**
			 * Thread.yield()的呼叫是對執行緒排程器(java執行緒機制的一部分,可以將CPU從一個執行緒轉移給另一個執行緒)的一種建議,
			 * 它在宣告:“我已經執行完生命週期中最重要的部分了,此刻正是切換給其他任務執行一段時間的大好時機。”
			 * 這完全是選擇性的,但是這裡使用它是因為它會在這些示例中產生更加有趣的輸出:你更有可能會看到任務換進換出的證據。
			 * 
			 */
			Thread.yield();
		}
	}
}
/**
 * 各個任務有main()放置到了BlockingQueue中,並且由LiftOffRunner從BlockingQueue中取出。注意,LiftOffRunner可以忽略
 * 同步問題,因為它們已經由BlockingQueue解決了。
 * 
 * @create @author Henry @date 2016-12-23
 *
 */
public class TestBlockingQueues {
	static void getkey() {
		try {
			// Compensate for Windows/Linux difference in the
			// length of the result produced by the Enter key;
			new BufferedReader(new InputStreamReader(System.in)).readLine();
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}

	static void getkey(String message) {
		System.out.println(message);
		getkey();
	}

	static void test(String msg, BlockingQueue<LiftOff> queue) throws InterruptedException {
		System.out.println(msg);
		LiftOffRunner runner = new LiftOffRunner(queue);
		Thread t = new Thread(runner);
		t.start();
		TimeUnit.SECONDS.sleep(2);
		for (int i = 0; i < 5; i++)
			runner.add(new LiftOff(5));
		TimeUnit.SECONDS.sleep(3);
		getkey("Press 'Enter' (" + msg + ")");
		t.interrupt();
		System.out.println("Finished " + msg + " test");
	}

	public static void main(String[] args) throws Exception {
		test("LinedBlockingQueue ",new LinkedBlockingQueue<LiftOff>());
		test("ArrayBlockingQueue ",new ArrayBlockingQueue<LiftOff>(3));
		test("SynchronousQueue ",new SynchronousQueue<LiftOff>());
	}
}


import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 吐司BlockingQueue
 * 
 * 考慮下面這個使用BlockingQueue的示例,有一臺機器具有三個任務:一個製作吐司、一個給吐司抹黃油,
 * 另一個在抹過黃油的吐司上塗果醬。我們可以通過各個處理過程之間的BlockingQueue來執行這個吐司製作程式:
 * 
 * @create @author Henry @date 2016-12-23
 * 
 */
class Toast {
	public enum Status {
		DRY, BUTTERED, JAMMED
	}

	private Status status = Status.DRY;
	private final int id;

	public Toast(int idn) {
		id = idn;
	}

	public void butter() {
		status = Status.BUTTERED;
	}

	public void jam() {
		status = Status.JAMMED;
	}

	public Status getStatus() {
		return status;
	}

	public int getId() {
		return id;
	}

	@Override
	public String toString() {
		return "Toast " + id + ": " + status;
	}
}

class ToastQueue extends LinkedBlockingQueue<Toast> {
}

class Toaster implements Runnable {
	private ToastQueue toashtQueue;
	private int count = 0;
	private Random rand = new Random(47);

	public Toaster(ToastQueue tq) {
		toashtQueue = tq;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
				Toast t = new Toast(count++);
				System.out.println(t);
				toashtQueue.put(t);
			}
		} catch (InterruptedException e) {
			System.err.println("Toaster interrupted");
		}
		System.out.println("Toaster off");
	}
}

// Apply butter to toast;
class Butterer implements Runnable {
	private ToastQueue dryQueue, butteredQueue;

	public Butterer(ToastQueue dry, ToastQueue buttered) {
		dryQueue = dry;
		butteredQueue = buttered;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				// Blocks until next piece of toast is availble;
				Toast t = dryQueue.take();
				t.butter();
				System.out.println(t);
				butteredQueue.put(t);
			}
		} catch (InterruptedException e) {
			System.err.println("Butterer interrupted");
		}
		System.out.println("Butterer off");
	}

}

// Apply jam to buttered toast;
class Jammer implements Runnable {
	private ToastQueue butteredQueue, finishedQueue;

	public Jammer(ToastQueue buttered, ToastQueue finished) {
		butteredQueue = buttered;
		finishedQueue = finished;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				// Blocks until next piece of toast is available;
				Toast t = butteredQueue.take();
				t.jam();
				System.out.println(t);
				finishedQueue.put(t);
			}
		} catch (InterruptedException e) {
			System.err.println("Jammer Interrupted");
		}
		System.out.println("Jammer off");
	}
}

class Eater implements Runnable {
	private ToastQueue finishedQueue;
	private int counter = 0;

	public Eater(ToastQueue finished) {
		finishedQueue = finished;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				// Blocks untill next piece of toast is available;
				Toast t = finishedQueue.take();
				// Verify that the toast is coming in order.
				// and that all pieces are getting jammed;
				if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
					System.out.println(">>> Error: " + t);
					System.exit(1);
				} else
					System.out.println("Chomp! " + t);
			}
		} catch (InterruptedException e) {
			System.err.println("Eater interrupted");
		}
		System.out.println("Eater off");
	}

}
/**
 * Toast 是一個使用enum值的優秀示例。注意,這個示例中沒有任何顯示的同步(即使用Lock物件,或synchronized 關鍵字的同步),
 * 因為同步佇列(其內部是同步的)和系統的設計隱式地管理了---每片Toast在任何時刻都由一個任務在操作。因為佇列的阻塞,使得
 * 處理過程將被自動地掛起和恢復。你可以看到由BlockingQueue產生的簡化十分明顯。在使用顯式的wait()和notifyAll()時存在
 * 的類和類之間的耦合被消除了,因為每個類都只和它的BlockingQueue通訊。
 * 
 * @create @author Henry @date 2016-12-23
 *
 */
public class ToastOMatic {
	public static void main(String[] args) throws Exception {
		ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue();
		ExecutorService exec=Executors.newCachedThreadPool();
		exec.execute(new Toaster(dryQueue));
		exec.execute(new Butterer(dryQueue,butteredQueue));
		exec.execute(new Jammer(butteredQueue,finishedQueue));
		exec.execute(new Eater(finishedQueue));
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();
		
	}
}