1. 程式人生 > >阿里面試官讓我實現一個執行緒安全並且可以設定過期時間的LRU快取,我蒙了!

阿里面試官讓我實現一個執行緒安全並且可以設定過期時間的LRU快取,我蒙了!

目錄
  • 1. LRU 快取介紹
  • 2. ConcurrentLinkedQueue簡單介紹
  • 3. ReadWriteLock簡單介紹
  • 4.ScheduledExecutorService 簡單介紹
  • 5. 徒手擼一個執行緒安全的 LRU 快取
    • 5.1. 實現方法
    • 5.2. 原理
    • 5.3. put方法具體流程分析
    • 5.4. 原始碼
  • 6. 實現一個執行緒安全並且帶有過期時間的 LRU 快取

最近被讀者問到“不用LinkedHashMap的話,如何實現一個執行緒安全的 LRU 快取?網上的程式碼太雜太亂,Guide哥哥能不能幫忙寫一個?”。 相關閱讀:V2.0 版本的 《JavaGuide面試突擊版》來啦!帶著它的線上閱讀版本來啦!

劃重點,手寫一個 LRU 快取在面試中還是挺常見的!

很多人就會問了:“網上已經有這麼多現成的快取了!為什麼面試官還要我們自己實現一個呢?” 。咳咳咳,當然是為了面試需要。哈哈!開個玩笑,我個人覺得更多地是為了學習吧!今天Guide哥教大家:

  1. 實現一個執行緒安全的 LRU 快取
  2. 實現一個執行緒安全並且帶有過期時間的 LRU 快取

考慮到了執行緒安全性我們使用了 ConcurrentHashMapConcurrentLinkedQueue 這兩個執行緒安全的集合。另外,還用到 ReadWriteLock(讀寫鎖)。為了實現帶有過期時間的快取,我們用到了 ScheduledExecutorService

來做定時任務執行。

如果有任何不對或者需要完善的地方,請幫忙指出!

1. LRU 快取介紹

LRU (Least Recently Used,最近最少使用)是一種快取淘汰策略。

LRU快取指的是當快取大小已達到最大分配容量的時候,如果再要去快取新的物件資料的話,就需要將快取中最近訪問最少的物件刪除掉以便給新來的資料騰出空間。

2. ConcurrentLinkedQueue簡單介紹

ConcurrentLinkedQueue是一個基於單向連結串列的無界無鎖執行緒安全的佇列,適合在高併發環境下使用,效率比較高。 我們在使用的時候,可以就把它理解為我們經常接觸的資料結構——佇列,不過是增加了多執行緒下的安全性保證罷了。和普通佇列一樣,它也是按照先進先出(FIFO)的規則對接點進行排序。 另外,佇列元素中不可以放置null元素。

ConcurrentLinkedQueue 整個繼承關係如下圖所示:

ConcurrentLinkedQueue中最主要的兩個方法是:offer(value)poll(),分別實現佇列的兩個重要的操作:入隊和出隊(offer(value)等價於 add(value))。

我們新增一個元素到佇列的時候,它會新增到佇列的尾部,當我們獲取一個元素時,它會返回佇列頭部的元素。

利用ConcurrentLinkedQueue佇列先進先出的特性,每當我們 put/get(快取被使用)元素的時候,我們就將這個元素存放在佇列尾部,這樣就能保證佇列頭部的元素是最近最少使用的。

3. ReadWriteLock簡單介紹

ReadWriteLock 是一個介面,位於java.util.concurrent.locks包下,裡面只有兩個方法分別返回讀鎖和寫鎖:

public interface ReadWriteLock {
    /**
     * 返回讀鎖
     */
    Lock readLock();

    /**
     * 返回寫鎖
     */
    Lock writeLock();
}

ReentrantReadWriteLockReadWriteLock介面的具體實現類。

讀寫鎖還是比較適合快取這種讀多寫少的場景。讀寫鎖可以保證多個執行緒和同時讀取,但是隻有一個執行緒可以寫入。但是,有一個問題是當讀鎖被執行緒持有的時候,讀鎖是無法被其它執行緒申請的,會處於阻塞狀態,直至讀鎖被釋放。

另外,同一個執行緒持有寫鎖時是可以申請讀鎖,但是持有讀鎖的情況下不可以申請寫鎖。

4.ScheduledExecutorService 簡單介紹

ScheduledExecutorService 是一個介面,ScheduledThreadPoolExecutor 是其主要實現類。

ScheduledThreadPoolExecutor 主要用來在給定的延遲後執行任務,或者定期執行任務。 這個在實際專案用到的比較少,因為有其他方案選擇比如quartz。但是,在一些需求比較簡單的場景下還是非常有用的!

ScheduledThreadPoolExecutor 使用的任務佇列 DelayQueue 封裝了一個 PriorityQueuePriorityQueue 會對佇列中的任務進行排序,執行所需時間短的放在前面先被執行,如果執行所需時間相同則先提交的任務將被先執行。

5. 徒手擼一個執行緒安全的 LRU 快取

5.1. 實現方法

ConcurrentHashMap + ConcurrentLinkedQueue +ReadWriteLock

5.2. 原理

ConcurrentHashMap 是執行緒安全的Map,我們可以利用它快取 key,value形式的資料。ConcurrentLinkedQueue是一個執行緒安全的基於連結串列的佇列(先進先出),我們可以用它來維護 key 。每當我們put/get(快取被使用)元素的時候,我們就將這個元素對應的 key 存放在佇列尾部,這樣就能保證佇列頭部的元素是最近最少使用的。當我們的快取容量不夠的時候,我們直接移除佇列頭部對應的key以及這個key對應的快取即可!

另外,我們用到了ReadWriteLock(讀寫鎖)來保證執行緒安全。

5.3. put方法具體流程分析

為了方便大家理解,我將程式碼中比較重要的 put(key,value)方法的原理圖畫了出來,如下圖所示:

5.4. 原始碼

/**
 * @author shuang.kou
 * <p>
 * 使用 ConcurrentHashMap+ConcurrentLinkedQueue+ReadWriteLock實現執行緒安全的 LRU 快取
 * 這裡只是為了學習使用,本地快取推薦使用 Guava 自帶的。
 */
public class MyLruCache<K, V> {

    /**
     * 快取的最大容量
     */
    private final int maxCapacity;

    private ConcurrentHashMap<K, V> cacheMap;
    private ConcurrentLinkedQueue<K> keys;
    /**
     * 讀寫鎖
     */
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock writeLock = readWriteLock.writeLock();
    private Lock readLock = readWriteLock.readLock();

    public MyLruCache(int maxCapacity) {
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("Illegal max capacity: " + maxCapacity);
        }
        this.maxCapacity = maxCapacity;
        cacheMap = new ConcurrentHashMap<>(maxCapacity);
        keys = new ConcurrentLinkedQueue<>();
    }

    public V put(K key, V value) {
        // 加寫鎖
        writeLock.lock();
        try {
            //1.key是否存在於當前快取
            if (cacheMap.containsKey(key)) {
                moveToTailOfQueue(key);
                cacheMap.put(key, value);
                return value;
            }
            //2.是否超出快取容量,超出的話就移除佇列頭部的元素以及其對應的快取
            if (cacheMap.size() == maxCapacity) {
                System.out.println("maxCapacity of cache reached");
                removeOldestKey();
            }
            //3.key不存在於當前快取。將key新增到佇列的尾部並且快取key及其對應的元素
            keys.add(key);
            cacheMap.put(key, value);
            return value;
        } finally {
            writeLock.unlock();
        }
    }

    public V get(K key) {
        //加讀鎖
        readLock.lock();
        try {
            //key是否存在於當前快取
            if (cacheMap.containsKey(key)) {
                // 存在的話就將key移動到佇列的尾部
                moveToTailOfQueue(key);
                return cacheMap.get(key);
            }
            //不存在於當前快取中就返回Null
            return null;
        } finally {
            readLock.unlock();
        }
    }

    public V remove(K key) {
        writeLock.lock();
        try {
            //key是否存在於當前快取
            if (cacheMap.containsKey(key)) {
                // 存在移除佇列和Map中對應的Key
                keys.remove(key);
                return cacheMap.remove(key);
            }
            //不存在於當前快取中就返回Null
            return null;
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * 將元素新增到佇列的尾部(put/get的時候執行)
     */
    private void moveToTailOfQueue(K key) {
        keys.remove(key);
        keys.add(key);
    }

    /**
     * 移除佇列頭部的元素以及其對應的快取 (快取容量已滿的時候執行)
     */
    private void removeOldestKey() {
        K oldestKey = keys.poll();
        if (oldestKey != null) {
            cacheMap.remove(oldestKey);
        }
    }

    public int size() {
        return cacheMap.size();
    }

}

非併發環境測試:

MyLruCache<Integer, String> myLruCache = new MyLruCache<>(3);
myLruCache.put(1, "Java");
System.out.println(myLruCache.get(1));// Java
myLruCache.remove(1);
System.out.println(myLruCache.get(1));// null
myLruCache.put(2, "C++");
myLruCache.put(3, "Python");
System.out.println(myLruCache.get(2));//C++
myLruCache.put(4, "C");
myLruCache.put(5, "PHP");
System.out.println(myLruCache.get(2));// C++

併發環境測試:

我們初始化了一個固定容量為 10 的執行緒池和count為10的CountDownLatch。我們將100次操作分10次新增到執行緒池
,然後我們等待執行緒池執行完成這10次操作(正常情況下會有10個執行緒同時執行任務,所以速度很快)。

int threadNum = 10;
int batchSize = 10;
//init cache
MyLruCache<String, Integer> myLruCache = new MyLruCache<>(batchSize * 10);
//init thread pool with 10 threads
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadNum);
//init CountDownLatch with 10 count
CountDownLatch latch = new CountDownLatch(threadNum);
AtomicInteger atomicInteger = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
for (int t = 0; t < threadNum; t++) {
    fixedThreadPool.submit(() -> {
        for (int i = 0; i < batchSize; i++) {
            int value = atomicInteger.incrementAndGet();
            myLruCache.put("id" + value, value);
        }
        latch.countDown();
    });
}
//wait for 10 threads to complete the task
latch.await();
fixedThreadPool.shutdown();
System.out.println("Cache size:" + myLruCache.size());//Cache size:100
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
System.out.println(String.format("Time cost:%dms", duration));//Time cost:511ms

6. 實現一個執行緒安全並且帶有過期時間的 LRU 快取

實際上就是在我們上面時間的LRU快取的基礎上加上一個定時任務去刪除快取,單純利用 JDK 提供的類,我們實現定時任務的方式有很多種:

  1. Timer :不被推薦,多執行緒會存在問題。
  2. ScheduledExecutorService :定時器執行緒池,可以用來替代 Timer
  3. DelayQueue :延時佇列
  4. quartz :一個很火的開源任務排程框架,很多其他框架都是基於 quartz 開發的,比如噹噹網的elastic-job 就是基於quartz二次開發之後的分散式排程解決方案
  5. ......

最終我們選擇了 ScheduledExecutorService,主要原因是它易用(基於DelayQueue做了很多封裝)並且基本能滿足我們的大部分需求。

我們在我們上面實現的執行緒安全的 LRU 快取基礎上,簡單稍作修改即可!我們增加了一個方法:

private void removeAfterExpireTime(K key, long expireTime) {
    scheduledExecutorService.schedule(() -> {
        //過期後清除該鍵值對
        cacheMap.remove(key);
        keys.remove(key);
    }, expireTime, TimeUnit.MILLISECONDS);
}

我們put元素的時候,如果通過這個方法就能直接設定過期時間。

完整原始碼如下:

/**
 * @author shuang.kou
 * <p>
 * 使用 ConcurrentHashMap+ConcurrentLinkedQueue+ReadWriteLock+ScheduledExecutorService實現執行緒安全的 LRU 快取
 * 這裡只是為了學習使用,本地快取推薦使用 Guava 自帶的,使用 Spring 的話,推薦使用Spring Cache
 */
public class MyLruCacheWithExpireTime<K, V> {

    /**
     * 快取的最大容量
     */
    private final int maxCapacity;

    private ConcurrentHashMap<K, V> cacheMap;
    private ConcurrentLinkedQueue<K> keys;
    /**
     * 讀寫鎖
     */
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock writeLock = readWriteLock.writeLock();
    private Lock readLock = readWriteLock.readLock();

    private ScheduledExecutorService scheduledExecutorService;

    public MyLruCacheWithExpireTime(int maxCapacity) {
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("Illegal max capacity: " + maxCapacity);
        }
        this.maxCapacity = maxCapacity;
        cacheMap = new ConcurrentHashMap<>(maxCapacity);
        keys = new ConcurrentLinkedQueue<>();
        scheduledExecutorService = Executors.newScheduledThreadPool(3);
    }

    public V put(K key, V value, long expireTime) {
        // 加寫鎖
        writeLock.lock();
        try {
            //1.key是否存在於當前快取
            if (cacheMap.containsKey(key)) {
                moveToTailOfQueue(key);
                cacheMap.put(key, value);
                return value;
            }
            //2.是否超出快取容量,超出的話就移除佇列頭部的元素以及其對應的快取
            if (cacheMap.size() == maxCapacity) {
                System.out.println("maxCapacity of cache reached");
                removeOldestKey();
            }
            //3.key不存在於當前快取。將key新增到佇列的尾部並且快取key及其對應的元素
            keys.add(key);
            cacheMap.put(key, value);
            if (expireTime > 0) {
                removeAfterExpireTime(key, expireTime);
            }
            return value;
        } finally {
            writeLock.unlock();
        }
    }

    public V get(K key) {
        //加讀鎖
        readLock.lock();
        try {
            //key是否存在於當前快取
            if (cacheMap.containsKey(key)) {
                // 存在的話就將key移動到佇列的尾部
                moveToTailOfQueue(key);
                return cacheMap.get(key);
            }
            //不存在於當前快取中就返回Null
            return null;
        } finally {
            readLock.unlock();
        }
    }

    public V remove(K key) {
        writeLock.lock();
        try {
            //key是否存在於當前快取
            if (cacheMap.containsKey(key)) {
                // 存在移除佇列和Map中對應的Key
                keys.remove(key);
                return cacheMap.remove(key);
            }
            //不存在於當前快取中就返回Null
            return null;
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * 將元素新增到佇列的尾部(put/get的時候執行)
     */
    private void moveToTailOfQueue(K key) {
        keys.remove(key);
        keys.add(key);
    }

    /**
     * 移除佇列頭部的元素以及其對應的快取 (快取容量已滿的時候執行)
     */
    private void removeOldestKey() {
        K oldestKey = keys.poll();
        if (oldestKey != null) {
            cacheMap.remove(oldestKey);
        }
    }

    private void removeAfterExpireTime(K key, long expireTime) {
        scheduledExecutorService.schedule(() -> {
            //過期後清除該鍵值對
            cacheMap.remove(key);
            keys.remove(key);
        }, expireTime, TimeUnit.MILLISECONDS);
    }

    public int size() {
        return cacheMap.size();
    }

}

測試效果:

MyLruCacheWithExpireTime<Integer,String> myLruCache = new MyLruCacheWithExpireTime<>(3);
myLruCache.put(1,"Java",3;
myLruCache.put(2,"C++",3;
myLruCache.put(3,"Python",1500);
System.out.println(myLruCache.size());//3
Thread.sleep(2;
System.out.println(myLruCache.size());//2