1. 程式人生 > >Guava Cache特性:refreshAfterWrite只阻塞回源執行緒,其他執行緒返回舊值

Guava Cache特性:refreshAfterWrite只阻塞回源執行緒,其他執行緒返回舊值

上一篇文章"Guava Cache特性:對於同一個key,只讓一個請求回源load資料,其他執行緒阻塞等待結果"提到:如果快取過期,恰好有多個執行緒讀取同一個key的值,那麼guava只允許一個執行緒去載入資料,其餘執行緒阻塞。這雖然可以防止大量請求穿透快取,但是效率低下。使用refreshAfterWrite可以做到:只阻塞載入資料的執行緒,其餘執行緒返回舊資料。

package net.aty.guava;

import com.google.common.base.Stopwatch;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


public class Main {

    // 模擬一個需要耗時2s的資料庫查詢任務
    private static Callable<String> callable = new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println("begin to mock query db...");
            Thread.sleep(2000);
            System.out.println("success to mock query db...");
            return UUID.randomUUID().toString();
        }
    };


    // 1s後重新整理快取
    private static LoadingCache<String, String> cache = CacheBuilder.newBuilder().refreshAfterWrite(1, TimeUnit.SECONDS)
            .build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws Exception {
                    return callable.call();
                }
            });

    private static CountDownLatch latch = new CountDownLatch(1);


    public static void main(String[] args) throws Exception {

        // 手動新增一條快取資料,睡眠1.5s讓其過期
        cache.put("name", "aty");
        Thread.sleep(1500);

        for (int i = 0; i < 8; i++) {
            startThread(i);
        }

        // 讓執行緒執行
        latch.countDown();

    }

    private static void startThread(int id) {
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + "...begin");
                    latch.await();
                    Stopwatch watch = Stopwatch.createStarted();
                    System.out.println(Thread.currentThread().getName() + "...value..." + cache.get("name"));
                    watch.stop();
                    System.out.println(Thread.currentThread().getName() + "...finish,cost time=" + watch.elapsed(TimeUnit.SECONDS));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        t.setName("Thread-" + id);
        t.start();
    }


}

通過輸出結果可以看出:當快取資料過期的時候,真正去載入資料的執行緒會阻塞一段時間,其餘執行緒立馬返回過期的值,顯然這種處理方式更符合實際的使用場景。

有一點需要注意:我們手動向快取中添加了一條資料,並讓其過期。如果沒有這行程式碼,程式執行結果如下。


由於快取沒有資料,導致一個執行緒去載入資料的時候,別的執行緒都阻塞了(因為沒有舊值可以返回)。所以一般系統啟動的時候,我們需要將資料預先載入到快取,不然就會出現這種情況。

還有一個問題不爽:真正載入資料的那個執行緒一定會阻塞,我們希望這個載入過程是非同步的。這樣就可以讓所有執行緒立馬返回舊值,在後臺重新整理快取資料。refreshAfterWrite預設的重新整理是同步的,會在呼叫者的執行緒中執行。我們可以改造成非同步的,實現CacheLoader.reload()。

package net.aty.guava;

import com.google.common.base.Stopwatch;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class Main {

    // 模擬一個需要耗時2s的資料庫查詢任務
    private static Callable<String> callable = new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println("begin to mock query db...");
            Thread.sleep(2000);
            System.out.println("success to mock query db...");
            return UUID.randomUUID().toString();
        }
    };

    // guava執行緒池,用來產生ListenableFuture
    private static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

    private static LoadingCache<String, String> cache = CacheBuilder.newBuilder().refreshAfterWrite(1, TimeUnit.SECONDS)
            .build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws Exception {
                    return callable.call();
                }

                @Override
                public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
                    System.out.println("......後臺執行緒池非同步重新整理:" + key);
                    return service.submit(callable);
                }
            });

    private static CountDownLatch latch = new CountDownLatch(1);


    public static void main(String[] args) throws Exception {
        cache.put("name", "aty");
        Thread.sleep(1500);

        for (int i = 0; i < 8; i++) {
            startThread(i);
        }

        // 讓執行緒執行
        latch.countDown();

    }

    private static void startThread(int id) {
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + "...begin");
                    latch.await();
                    Stopwatch watch = Stopwatch.createStarted();
                    System.out.println(Thread.currentThread().getName() + "...value..." + cache.get("name"));
                    watch.stop();
                    System.out.println(Thread.currentThread().getName() + "...finish,cost time=" + watch.elapsed(TimeUnit.SECONDS));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        t.setName("Thread-" + id);
        t.start();
    }


}