1. 程式人生 > >使用LinkedBlockingQueue來實現生產者消費者的例子

使用LinkedBlockingQueue來實現生產者消費者的例子

工作中,經常有將檔案中的資料匯入資料庫的表中,或者將資料庫表中的記錄儲存到檔案中。為了提高程式的處理速度,可以設定讀執行緒和寫執行緒,這些執行緒通過訊息佇列進行資料互動。本例就是使用了LinkedBlockingQueue來模仿生產者執行緒和消費者執行緒進行資料生產和消費。
為了方便,這些不同的類被寫在了一個類中,實際使用的時候,可以單獨拆開,舉一反三地使用。

以下是例子:
LinkedBlockingQueueDemo.java

import java.util.Date;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class LinkedBlockingQueueDemo {
	// 生產者執行緒數量
	private final static int providerThreadAmount = 5;

	// 記錄每一個生產者執行緒是否處理完畢的標記
	private static boolean[] providerDoneFlag = new boolean[providerThreadAmount];

	// 整個所有的生產者執行緒全部結束的標記
	private static boolean done = false;

	// 一個執行緒安全的佇列,用於生產者和消費者非同步地資訊互動
	private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>();

	static class ProviderThread extends Thread {
		private Thread thread;
		private String threadName;
		private int threadNo;

		public ProviderThread(String threadName2, int threadNo) {
			this.threadName = threadName2;
			this.threadNo = threadNo;
		}

		public void start() {
			if (thread == null) {
				thread = new Thread(this, threadName);
			}

			thread.start();
			System.out.println(
					(new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());
		}

		@Override
		public void run() {
			int rows = 0;
			for (int i = 0; i < 100; i++) {
				String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName());
                // offer不會去阻塞執行緒,put會
				//linkedBlockingQeque.offer(string);
                linkedBlockingQeque.put(string);
				rows++;
				/*
				 * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch
				 * (InterruptedException e) { e.printStackTrace(); }
				 */
			}

			// 本執行緒處理完畢的標記
			LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true;
			System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t"
					+ Thread.currentThread().getName());
		}
	}

	static class ConsumerThread implements Runnable {
		private Thread thread;
		private String threadName;

		public ConsumerThread(String threadName2) {
			this.threadName = threadName2;
		}

		public void start() {
			if (thread == null) {
				thread = new Thread(this, threadName);
			}

			thread.start();
			System.out.println(
					(new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());
		}

		@Override
		public void run() {
			int rows = 0;
			// 生產者執行緒沒有結束,或者訊息佇列中有元素的時候,去佇列中取資料
			while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) {
				try {
                    //在甘肅電信的實際應用中發現,當資料的處理量達到千萬級的時候,帶引數的poll會將主機的幾百個G的記憶體耗盡,jvm會提示申請記憶體失敗,並將程序退出。網上說,這是這個方法的一個bug。
					//String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS);
                    String string = linkedBlockingQeque.poll();
					if (string == null) {
						continue;
					}

					rows++;

					System.out
							.println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is "
									+ string + "\t" + Thread.currentThread().getName());
					/*
					 * try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch
					 * (InterruptedException e) { e.printStackTrace(); }
					 */

				} catch (InterruptedException e) {
					e.printStackTrace();
				}

			}
			System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t"
					+ Thread.currentThread().getName());
		}
	}

	public static synchronized void setDone(boolean flag) {
		LinkedBlockingQueueDemo.done = flag;
	}
	
	public static synchronized boolean getDone() {
		return LinkedBlockingQueueDemo.done;
	}

	public static void main(String[] args) {
		System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName());
		System.out.println(
				(new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode());

		// 啟動若干生產者執行緒
		for (int i = 0; i < providerThreadAmount; i++) {
			String threadName = String.format("%s-%d", "ProviderThread", i);
			ProviderThread providerThread = new ProviderThread(threadName, i);
			providerThread.start();
		}

		// 啟動若干個消費者執行緒
		for (int i = 0; i < 10; i++) {
			String threadName = String.format("%s-%d", "ConsumerThread", i);
			ConsumerThread consumerThread = new ConsumerThread(threadName);
			consumerThread.start();
		}

		// 迴圈檢測生產者執行緒是否處理完畢
		do {
			for (boolean b : providerDoneFlag) {
				if (b == false) {
					/*
					 * try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) +
					 * " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque.
					 * size() + "\t" + Thread.currentThread().getName()); } catch
					 * (InterruptedException e) { e.printStackTrace(); }
					 */

					// 只要有一個生產者執行緒沒有結束,則整個生產者執行緒檢測認為沒有結束
					break;
				}

				LinkedBlockingQueueDemo.setDone(true);
			}

			// 生產者執行緒全部結束的時候,跳出檢測
			if (LinkedBlockingQueueDemo.getDone() == true) {
				break;
			}
		} while (true);

		System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName());
	}
}

結果略。