1. 程式人生 > >非阻塞同步演算法實戰(四)- 計數器定時持久化

非阻塞同步演算法實戰(四)- 計數器定時持久化

問題背景及要求

  • 需要對評論進行點贊次數和被評論次數進行統計,或者更多維度
  • 要求高併發、高效能計數,允許極端情況丟失一些統計次數,例如宕機
  • 評論很多,不能為每一個評論都一直保留其計數器,計數器需要有回收機制

問題抽象及分析

根據以上需求,為了方便編碼與測試,我們把需求轉化為以下介面

/**
 * 計數器
 */
public interface Counter {
    /**
     * 取出統計資料,用Saver去持久化(僅定時器會呼叫,無併發)
     * @param saver
     */
    void save(Saver saver);

    /**
     * 計數(有併發)
     * @param key 業務ID
     * @param like 點贊
     * @param comment 評論
     */
    void add(String key, int like, int comment);

    /**
     * 持久化器,將數量持久化到資料庫等
     */
    @FunctionalInterface
    interface Saver{
        void save(String key, int like, int comment);
    }
}

簡單分析可知,計數器比較簡單,用AtomicInteger便能保證原子性,但考慮到計數器會被回收,則可能會出現這樣的場景:某計數器已被回收了,此時繼續在該計數器上計數,便會造成資料丟失,因此要處理該併發問題

解決方案

方案一

使用原生鎖來解決競爭問題

/**
 * 直接對所有操作上鎖,來保證執行緒安全
 */
public class SynchronizedCounter implements Counter{
    private HashMap<String, Adder> map = new HashMap<>();

    @Override
    public synchronized void save(Saver saver) {
        map.forEach((key, value)->{//因為已加鎖,所以可以安全地取資料
            saver.save(key, value.like, value.comment);
        });
        map = new HashMap<>();
    }

    @Override
    public synchronized void add(String key, int like, int comment) {
        //因為已加鎖,所以可以安全地更新資料
        Adder adder = map.computeIfAbsent(key, x -> new Adder());
        adder.like += like;
        adder.comment += comment;
    }
    static class Adder{
        private int like;
        private int comment;
    }
}

方案點評:該方案讓業務執行緒和定時儲存執行緒競爭同一把例項鎖,讓他們互斥地訪問,解決了競爭問題,但鎖粒度太粗爆,效能低下


方案二

為了循序漸進,我們把“計數器需要有回收機制”這條要求去掉,這樣我們可以很容易地利用上AtomicInteger這個類

/**
 * 不回收計數器,問題變得簡單許多
 */
public class IncompleteCounter implements Counter {
    private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
    @Override
    public void save(Saver saver) {
        map.forEach((key, value)->{//利用了AtomicInteger的原子特性,可以執行緒安全地取出所有計數,並置0(因為還會繼續使用)
            saver.save(key, value.like.getAndSet(0), value.comment.getAndSet(0));
        });
        //因為不回收,所以不用考慮Adder被回收丟棄後,仍被其它執行緒使用的情況(因為沒有鎖,所以這種情況是可能發生的)
    }

    @Override
    public void add(String key, int like, int comment) {
        Adder adder = map.computeIfAbsent(key, k -> new Adder());
        adder.like.addAndGet(like);//利用AtomicInteger的原子特性,保證了執行緒安全
        adder.comment.addAndGet(comment);
    }
    static class Adder{
        AtomicInteger like = new AtomicInteger();
        AtomicInteger comment = new AtomicInteger();
    }
}

方案點評:除了沒解決回收問題,簡單高效


方案三

因為呼叫save的執行緒沒有併發情況,阻塞也沒關係,經分析可巧妙地使用讀寫鎖,同時又不讓add方法進入阻塞

/**
 * 巧妙地利用讀寫鎖,及save方法可阻塞的特點,實現add操作無阻塞
 */
public class ReadWriteLockCounter implements Counter {
    private volatile MapWithLock mapWithLock = new MapWithLock();

    @Override
    public void save(Saver saver) {
        MapWithLock preMapWithLock = mapWithLock;
        mapWithLock = new MapWithLock();
        //不會一直阻塞,因為mapWithLock已被替換,新的add呼叫會拿到新的mapWithLock
        preMapWithLock.lock.writeLock().lock();
        preMapWithLock.map.forEach((key,value)->{
            //value已經廢棄,故無需value.like.getAndSet(0)
            saver.save(key, value.like.get(), value.comment.get());
        });
        //不能釋放該鎖,否則add方法中,對被替換掉的MapWithLock.lock執行tryLock會成功
        //也許,這是你第一次見到的不需要且不允許釋放的鎖:)
    }

    @Override
    public void add(String key, int like, int comment) {
        MapWithLock mapWithLock;
        //如果通過tryLock獲取鎖失敗,則表示該mapWithLock已經被廢棄了(因為只有廢棄了的MapWithLock才會加寫鎖),故重新獲取最新的mapWithLock
        while(!(mapWithLock = this.mapWithLock).lock.readLock().tryLock());
        try{
            Adder adder = mapWithLock.map.computeIfAbsent(key, k -> new Adder());
            adder.like.getAndAdd(like);
            adder.comment.getAndAdd(comment);
        }finally {
            mapWithLock.lock.readLock().unlock();
        }
    }

    static class Adder{
        private AtomicInteger like = new AtomicInteger();
        private AtomicInteger comment = new AtomicInteger();

    }
    static class MapWithLock{
        private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
        private ReadWriteLock lock = new ReentrantReadWriteLock();
    }
}

方案點評:減少了鎖的粒度,同時add執行緒可以相互相容,大幅提升了併發能力,save執行緒雖會阻塞,但結合其定時執行的特點,並不受影響,且即使極端情況也不會一直阻塞


方案四

使用一個原子的state來替換LockCounter中的ReadWriteLock(因為只使用到了它的部分特性),實現wait-free,獲得更高效能

/**
 * ReadWriteLockCounter的改進版,去掉ReadWriteLock,結合當前場景,實現一個wait-free的簡易讀寫鎖<br/>
 */
public class CustomLockCounter implements Counter {
    private volatile MapWithState mapWithState = new MapWithState();

    @Override
    public void save(Saver saver) {
        MapWithState preMapWithState = mapWithState;
        mapWithState = new MapWithState();
        //compareAndSet失敗則表示該MapWithState正在被使用,等其使用完,它不會一直失敗,因為mapWithState已經被替換
        while(!preMapWithState.state.compareAndSet(0,Integer.MIN_VALUE)){
            Thread.yield();
        }
        preMapWithState.map.forEach((key, value)->{
            //value已經廢棄,故無需value.like.getAndSet(0)
            saver.save(key, value.like.get(), value.comment.get());
        });
    }

    @Override
    public void add(String key, int like, int comment) {
        MapWithState mapWithState;//add的併發,不可能將Integer.MIN_VALUE自增成正數(設定為Integer.MIN_VALUE時,該MapWithState已經被廢棄了)
        while((mapWithState = this.mapWithState).state.getAndIncrement()<0);
        try{
            Adder adder = mapWithState.map.computeIfAbsent(key, k -> new Adder());
            adder.like.getAndAdd(like);
            adder.comment.getAndAdd(comment);
        }finally {
            mapWithState.state.getAndDecrement();
        }
    }

    static class Adder{
        private AtomicInteger like = new AtomicInteger();
        private AtomicInteger comment = new AtomicInteger();

    }
    static class MapWithState {
        private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
        private AtomicInteger state = new AtomicInteger();
    }
}

方案點評:保留了前一方案ReadWriteLockCounter的優點,同時結合場景的特點做了些優化,本質就是將CAS失敗重試迴圈替換成了一條fetch-and-add指令,如果不是因為save是低頻執行,本方案可能是最高效的了(暫且忽略ConcurrentHashMap等其它可能的優化空間)


方案五

先假定不會發生競爭,然後檢測競爭情況,如果發生競爭,則補償

/**
 * 樂觀地假定不會發生競爭,如果發生了,則嘗試進行補償
 */
public class CompensationCounter implements Counter {
    private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
    @Override
    public void save(Saver saver) {
        for(Iterator<Map.Entry<String, Adder>> it = map.entrySet().iterator(); it.hasNext();){
            Map.Entry<String, Adder> entry = it.next();
            it.remove();
            entry.getValue().discarded = true;
            saver.save(entry.getKey(), entry.getValue().like.getAndSet(0), entry.getValue().comment.getAndSet(0));//需將計數器置0,此處存在競爭
        }
    }

    @Override
    public void add(String key, int like, int comment) {
        Adder adder = map.computeIfAbsent(key, k -> new Adder());
        adder.like.addAndGet(like);
        adder.comment.addAndGet(comment);
        if(adder.discarded){//如果數量加在了廢棄的Adder上面,則執行補償邏輯
            int likeTemp = adder.like.getAndSet(0);
            int commentTemp = adder.comment.getAndSet(0);
            //即使此後又有執行緒在計數器上計數了也無妨
            if(likeTemp != 0 || commentTemp != 0){
                add(key, likeTemp, commentTemp);//補償
            }//也可能已經被其它執行緒取走了,但並不影響業務正確性
        }
    }
    static class Adder{
        AtomicInteger like = new AtomicInteger();
        AtomicInteger comment = new AtomicInteger();
        volatile boolean discarded = false;//只有儲存執行緒會將它改為true,故使用volatile便能保證執行緒安全
    }
}

方案點評:跟樂觀鎖的思路類似,在競爭激烈的情況下,一般不會有最優效能,但此處因為save方法是低頻執行的且自身無併發,add方法才有高併發,故失敗補償其實很少真正被執行,這也是為什麼測試結果中本方案效能最優的原因


效能測試

最終我們來測試一下各方案的效能,因為我們抽象出了一個統一的介面,故測試也較為容易

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class CounterTester {
    private static final int THREAD_SIZE = 6;//add方法的併發執行緒數
    private static final int ADD_SIZE = 5000000;//測試規模
    private static final int KEYS_SIZE = 128*1024;
    public static void main(String[] args) throws InterruptedException {
        Counter[] counters = new Counter[]{new SynchronizedCounter(), new IncompleteCounter(), new ReadWriteLockCounter(), new CustomLockCounter(), new CompensationCounter()};
        String[] keys = new String[KEYS_SIZE];
        Random random = new Random();
        for (int i = 0; i < keys.length; i++) {
            keys[i]=String.valueOf(random.nextInt(KEYS_SIZE*1024));
        }
        for (Counter counter : counters) {
            AtomicInteger totalLike = new AtomicInteger();
            AtomicInteger totalComment = new AtomicInteger();
            AtomicInteger savedTotalLike = new AtomicInteger();
            AtomicInteger savedTotalComment = new AtomicInteger();
            Counter.Saver saver = (key, like, comment) -> {
                savedTotalLike.addAndGet(like);//模擬被持久化到資料庫,記錄數量以便後續校驗正確性
                savedTotalComment.addAndGet(comment);//同上
            };
            CountDownLatch latch = new CountDownLatch(THREAD_SIZE);
            long start = System.currentTimeMillis();
            for (int i = 0; i < THREAD_SIZE; i++) {
                new Thread(()->{
                    Random r = new Random();
                    int like, comment;
                    for (int j = 0; j < ADD_SIZE; j++) {
                        like = 2;
                        comment = 4;
                        counter.add(keys[r.nextInt(KEYS_SIZE)], like, comment);
                        totalLike.addAndGet(like);
                        totalComment.addAndGet(comment);
                    }
                    latch.countDown();
                }).start();
            }
            Thread saveThread = new Thread(()->{
                while(latch.getCount() != 0){
                    try {
                        Thread.sleep(100);//模擬100毫秒執行一次持久化
                    } catch (InterruptedException e) {}
                    counter.save(saver);
                }
                counter.save(saver);

            });
            saveThread.start();
            latch.await();
            System.out.println(counter.getClass().getSimpleName() +" cost:\t"+(System.currentTimeMillis() - start));
            saveThread.join();
            boolean error = savedTotalLike.get() != totalLike.get() || savedTotalComment.get() != totalComment.get();
            (error?System.err:System.out).println("saved:\tlike="+savedTotalLike.get()+"\tcomment="+savedTotalComment.get());
            (error?System.err:System.out).println("added:\tlike="+totalLike.get()+"\tcomment="+totalComment.get()+"\n");
        }
    }
}

 

在jdk11(jdk8也基本一致)下的測試結果如下:

注:方案二的IncompleteCounter並未完成回收,僅作對比

SynchronizedCounter cost:    12377
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

IncompleteCounter cost:    2560
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

ReadWriteLockCounter cost:    7902
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

CustomLockCounter cost:    3541
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

CompensationCounter cost:    2093
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

 

小結

非阻塞同步演算法一般不需要我們去設計,直接使用現有的工具便可,但如果真想通過它進一步去壓榨效能,應細心分析各執行緒穿插執行的情況,同時結合業務場景來考慮(也許在A場景不允許的情況,在B場景是允許的)