1. 程式人生 > >Java 多執行緒 無界的BlockingQueue DelayQueue

Java 多執行緒 無界的BlockingQueue DelayQueue

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * DelayQueue
 * 
 * 這是一個無界的BlockingQueue,用於放置實現了Delayed介面的物件,
 * 其中的物件只能在其到時才能從佇列中取走。這種佇列是有序的,即隊頭物件的延遲到期的時間最長。
 * 如果沒有任何延遲到期,那麼就不會有任何頭元素,並且poll()將返回null(正因為這樣,你不能將
 * null放置到這種佇列中)。
 * 
 * 下面是一個示例,其中的Delayed物件自身就是任務,而DelayedTaskConsumer將最“緊急”的任務
 * (到期時間最長的任務)從佇列中取出,然後執行它。注意,這樣DelayQueue就成為了優先順序佇列的一種變體:
 */
/**
 * DelayedTask包含一個稱為sequence的List<DelayedTask>,它儲存了任務被建立的順序,因此我們可以看到
 * 排序是按照實際發生的順序執行的。
 * 
 * Delayed介面有一個方法名為getDelay(),它可以用來告知延遲到期有多長時間,或者延遲在多長時間之前已經到期。
 * 這個方法將強制我們去使用TimeUnit類,因此這就是引數型別。這會產生一個非常方便的類,因為你可以很容易地
 * 轉換單位而無需任何宣告。例如,delta的值時以毫秒為單位儲存的,但是java SE5的方法System.nanoTime()產生
 * 的時間則是以納米為單位的。你可以轉換delta的值,這個方法是宣告它的單位以及你希望以什麼單位來表示,
 * 就像下面這樣:
 * NANOSECONDS.convert(delta,MILLISECONDS);
 * 
 * 在getDelay()中,希望使用的單位是作為unit引數傳遞進來的,你使用它將當前時間與觸發時間之間的差轉換為呼叫者
 * 要求的單位,而無需只掉這些單位是什麼(這是策略設計模式的一個簡單示例,在這種模式中,演算法的一部分是作為引數
 * 傳遞進來的)。
 * 
 * 為了排序,Delayed介面還繼承了Comparable介面,因此必須實現compareTo(),使其可以產生合理的比較。toString()
 * 和summary()提供了輸出格式化,而巢狀的EndSentinel類提供了一種關閉所有事物的途徑,具體做法是將其放置為佇列的
 * 最後一個元素。
 * 
 * @create @author Henry @date 2017-1-3
 */
class DelayedTask implements Runnable, Delayed {
	private static int counter = 0;
	private final int id = counter++;
	private final int delta;
	private final long trigger;
	protected static List<DelayedTask> sequence = new ArrayList<DelayedTask>();

	public DelayedTask(int delayInMilliseconds) {
		delta = delayInMilliseconds;
		trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
		sequence.add(this);
	}

	@Override
	public void run() {
		System.out.println(this + " ---");
	}

	@Override
	public int compareTo(Delayed o) {
		DelayedTask that = (DelayedTask) o;
		if (trigger < that.trigger)
			return -1;
		if (trigger > that.trigger)
			return 1;
		return 0;
	}

	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
	}

	@Override
	public String toString() {
		return String.format("[%1$-4d]", delta) + " Task " + id;
	}

	public String summary() {
		return "(" + id + ":" + delta + ")";
	}

	public static class EndSentinel extends DelayedTask {
		private ExecutorService exec;

		public EndSentinel(int delay, ExecutorService e) {
			super(delay);
			exec = e;
		}

		@Override
		public void run() {
			for (DelayedTask pt : sequence) {
				System.out.println(pt.summary() + " ++");
			}
			System.out.println();
			System.out.println(this + " Calling shutdownNow()");
			exec.shutdownNow();
		}
	}
}
/**
 * 注意,因為DelayedTaskConsumer自身是一個任務,所以它由自己的Thread,它可以使用這個執行緒來執行從佇列中獲取
 * 的所有任務。由於這個任務是按照佇列優先順序的順序執行的,因此在本例中不需要啟動任何單獨的執行緒來執行DelayedTask.
 * 
 * @create @author Henry @date 2017-1-3
 */
class DelayedTaskConsumer implements Runnable {
	private DelayQueue<DelayedTask> q;

	public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
		this.q = q;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted())
				q.take().run();
				//new Thread(q.take()).start();
		} catch (InterruptedException e) {
			System.err.println("InterruptedException");
		}
		System.out.println("Finished DelayedTaskConsumer");
	}
}

/**
 * 從輸出中可以看到,任務建立的順序對執行順序沒有任何影響,任務是按照所期望的延遲順序執行的。
 * 
 * @create @author Henry @date 2017-1-3
 */
public class DelayQueueDemo {
	public static void main(String[] args) {
		Random rand = new Random(47);
		ExecutorService exec = Executors.newCachedThreadPool();
		DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
		// Fill with tasks that have random delays;
		for (int i = 0; i < 5; i++)
			queue.put(new DelayedTask(rand.nextInt(5000)));
		// Set the stopping point
		queue.add(new DelayedTask.EndSentinel(5000, exec));
		exec.execute(new DelayedTaskConsumer(queue));
	}
}

輸出結果:

[555 ] Task 1 ---
[961 ] Task 4 ---
[1693] Task 2 ---
[1861] Task 3 ---
[4258] Task 0 ---
(0:4258) ++
(1:555) ++
(2:1693) ++
(3:1861) ++
(4:961) ++
(5:5000) ++

[5000] Task 5 Calling shutdownNow()
Finished DelayedTaskConsumer


相關推薦

Java 執行 BlockingQueue DelayQueue

import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.conc

Java執行-

1 無鎖類的原理詳解 1.1 CAS CAS演算法的過程是這樣:它包含3個引數CAS(V,E,N)。V表示要更新的變數,E表示預期值,N表示新值。僅當V 值等於E值時,才會將V的值設為N,如果V值和E值不同,則說明已經有其他執行緒做了更新,則當前執行緒什麼 都不做。最後,CAS返

Java執行-工具篇-BlockingQueue

前言:      在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成

Java 執行高併發 2 — CAS

在 Java 併發程式設計裡面,最可愛的就是無鎖了,非常巧妙,精彩絕倫 額。O__O "… 那麼什麼是無鎖? 顧名思義,在併發情況下采用無鎖的方式實現物件操作的原子性,保證資料一致性、安全性、正確性

java執行之併發集合(BlockingQueue

簡介 實現 package com.np.ota.test.queue; import java.util.concurrent.BlockingQueue; import java.ut

[引用]java執行學習-java.util.concurrent詳解(四) BlockingQueue

自:http://janeky.iteye.com/blog/7706717.BlockingQueue     “支援兩個附加操作的 Queue,這兩個操作是:獲取元素時等待佇列變為非空,以及儲存元素時等待空間變得可用。“     這裡我們主要討論BlockingQueue的最典型實現:LinkedBlo

Java執行程式設計-(14)-鎖CAS操作以及Java中Atomic併發包的“18羅漢”

原文出自 : https://blog.csdn.net/xlgen157387/article/details/78364246 上一篇: Java多執行緒程式設計-(13)- 關於鎖優化的幾點建議 一、背景 通過上面的學習,我們應該很清楚的

Java執行-生產者消費者例子-使用阻塞佇列(BlockingQueue)實現

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Created by wisgood . */ public

Java執行-BlockingQueue(阻塞佇列)

前言:   BlockingQueue是多執行緒安全的佇列,它有兩種常見的阻塞場景。    佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。 當佇列中填滿

(八) Java執行詳解之阻塞佇列BlockingQueue及佇列優先順序詳解

阻塞佇列 阻塞佇列與普通佇列的區別在於當佇列是空時從佇列中獲取元素的操作將會被阻塞,或者當佇列是滿時往佇列裡新增元素的操作會被阻塞。試圖從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其他的執行緒往空的佇列插入新的元素,同樣試圖往已滿的阻塞佇列中新增新元素的執

Java執行BlockingQueue深入分析

一、概述: BlockingQueue作為執行緒容器,可以為執行緒同步提供有力的保障。 二、BlockingQueue定義的常用方法 1.BlockingQueue定義的常用方法如下:   丟擲異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, un

Java執行/併發26、阻塞佇列BlockingQueue

BlockingQueue介面定義了一種佇列,這種佇列通常容量是提前固定(確定了容量大小)的。容量滿時往BlockingQueue中新增資料時會造成阻塞,容量為空時取元素操作會阻塞。 我們可以認為BlockingQueue佇列是一個水庫。水庫滿了的時侯,上游的

Java執行15:Queue、BlockingQueue以及利用BlockingQueue實現生產者/消費者模型

轉自:http://www.cnblogs.com/xrq730/p/4855857.htmlQueue是什麼佇列,是一種資料結構。除了優先順序佇列和LIFO佇列外,佇列都是以FIFO(先進先出)的方式對各個元素進行排序的。無論使用哪種排序方式,佇列的頭都是呼叫remove(

Java執行 -- JUC包原始碼分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/鎖鏈表

-為什麼是SkipList,不是TreeMap的紅黑樹? -無鎖鏈表的精髓 -ConcurrentSkipListMap -ConcurrentSkipListSet 為什麼是SkipList? 大家都知道,在Java集合類中,有一個TreeMap

Java執行BlockingQueue深入分析

二、BlockingQueue定義的常用方法 1.BlockingQueue定義的常用方法如下: 1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則招聘異常 2)off

java執行使用BlockingQueue阻塞佇列實現互斥同步通訊

package com.study; import java.util.concurrent.ArrayBlockingQu

Java執行實現電影院售票案例

某電影院目前正在上映賀歲大片,共有100張票,而它有3個售票視窗,請設計一個程式模擬該電影院售票。 定義Sell類實現Runnable介面,很好的解決了單繼承共享資源問題 public class Sell implements Runnable { // 定義100張票,三個售票

java執行物件鎖、類鎖、同步機制詳解

1.在java多執行緒程式設計中物件鎖、類鎖、同步機制synchronized詳解:     物件鎖:在java中每個物件都有一個唯一的鎖,物件鎖用於物件例項方法或者一個物件例項上面的。     類鎖:是用於一個類靜態方法或者class物件的,一個

Java 執行實現死鎖場景

簡述: 《Java 程式設計思想》  P718 ~ P722 模擬死鎖的場景, 三個人 三根筷子,每個人需要拿到身邊的兩根筷子才能開始吃飯 出現死鎖的場景是,三個人都拿到了右邊的筷子,但是由於筷子都被搶佔,均無法獲得左邊的筷子 Chopstick.java

Java 執行 join和interrupt 方法

簡述: 使用Java多執行緒中join和interrupt函式 《Java程式設計思想》 P669 ~ P670 一個執行緒可以再其他執行緒上呼叫join()方法,其效果是等待一段時間直到第二個執行緒結束才繼續執行。 如果某個執行緒在另一個執行緒t上呼叫t.join(), 此