1. 程式人生 > >使用DelayQueue 和 FutureTask 實現java中的快取

使用DelayQueue 和 FutureTask 實現java中的快取

使用DelayQueue、ConcurrentHashMap、FutureTask實現的快取工具類。

DelayQueue 簡介

DelayQueue是一個支援延時獲取元素的無界阻塞佇列。DelayQueue內部佇列使用PriorityQueue來實現。佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。

DelayQueue非常有用,可以將DelayQueue運用在以下應用場景。

  1. 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢
    DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
  2. 定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從
    DelayQueue中獲取到任務就開始執行,比如TimerQueue就是使用DelayQueue實現的。

ConcurrentHashMap和FutureTask,詳見以下:

快取工具類實現

  1. 支援快取多長時間,單位毫秒。
  2. 支援多執行緒併發。
    比如:有一個比較耗時的操作,此時緩衝中沒有此快取值,一個執行緒開始計算這個耗時操作,而再次進來執行緒就不需要再次進行計算,只需要等上一個執行緒計算完成後(使用FutureTask)返回該值即可。
import java.util.concurrent.Callable;
import
java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import
java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class CacheBean<V> { // 快取計算的結果 private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>(); // 延遲佇列來判斷那些快取過期 private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>(); // 快取時間 private final int ms; static { // 定時清理過期快取 Thread t = new Thread() { @Override public void run() { dameonCheckOverdueKey(); } }; t.setDaemon(true); t.start(); } private final Computable<V> c; /** * @param c Computable */ public CacheBean(Computable<V> c) { this(c, 60 * 1000); } /** * @param c Computable * @param ms 快取多少毫秒 */ public CacheBean(Computable<V> c, int ms) { this.c = c; this.ms = ms; } public V compute(final String key) throws InterruptedException { while (true) { //根據key從快取中獲取值 Future<V> f = (Future<V>) cache.get(key); if (f == null) { Callable<V> eval = new Callable<V>() { public V call() { return (V) c.compute(key); } }; FutureTask<V> ft = new FutureTask<>(eval); //如果快取中存在此可以,則返回已存在的value f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft); if (f == null) { //向delayQueue中新增key,並設定該key的存活時間 delayQueue.put(new DelayedItem<>(key, ms)); f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove(key, f); } catch (ExecutionException e) { e.printStackTrace(); } } } /** * 檢查過期的key,從cache中刪除 */ private static void dameonCheckOverdueKey() { DelayedItem<String> delayedItem; while (true) { try { delayedItem = delayQueue.take(); if (delayedItem != null) { cache.remove(delayedItem.getT()); System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } class DelayedItem<T> implements Delayed { private T t; private long liveTime; private long removeTime; public DelayedItem(T t, long liveTime) { this.setT(t); this.liveTime = liveTime; this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { if (o == null) return 1; if (o == this) return 0; if (o instanceof DelayedItem) { DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o; if (liveTime > tmpDelayedItem.liveTime) { return 1; } else if (liveTime == tmpDelayedItem.liveTime) { return 0; } else { return -1; } } long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return diff > 0 ? 1 : diff == 0 ? 0 : -1; } @Override public long getDelay(TimeUnit unit) { return unit.convert(removeTime - System.currentTimeMillis(), unit); } public T getT() { return t; } public void setT(T t) { this.t = t; } @Override public int hashCode() { return t.hashCode(); } @Override public boolean equals(Object object) { if (object instanceof DelayedItem) { return object.hashCode() == hashCode() ? true : false; } return false; } }

Computable 介面

public interface Computable<V> {

    V compute(String k);

}

測試類

public class FutureTaskDemo {

    public static void main(String[] args) throws InterruptedException {
        // 子執行緒
        Thread t = new Thread(() -> {
            CacheBean<String> cb = new CacheBean<>(k -> {
                try {
                    System.out.println("模擬計算資料,計算時長2秒。key=" + k);
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "你好:" + k;
            }, 5000);

            try {
                while (true) {
                    System.out.println("thead2:" + cb.compute("b"));
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t.start();

        // 主執行緒
        while (true) {
            CacheBean<String> cb = new CacheBean<>(k -> {
                try {
                    System.out.println("模擬計算資料,計算時長2秒。key=" + k);
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "你好:" + k;
            }, 5000);

            System.out.println("thead1:" + cb.compute("b"));
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

執行結果:
執行結果

兩個執行緒同時訪問同一個key的快取。從執行結果發現,每次快取失效後,同一個key只執行一次計算,而不是多個執行緒併發執行同一個計算然後快取。