QMQ原始碼分析之Actor
QMQ有關actor的一篇文章 闡述了actor的應用場景。即client消費訊息的請求會先進入一個RequestQueue,在client消費訊息時,往往存在多個主題、多個消費組共享一個RequestQueue消費訊息。在這個Queue中,存在不同主題的有不同消費組數量,以及不同消費組有不同consumer數量,那麼就會存在搶佔資源的情況。舉個文章 中的例子,一個主題下有兩個消費組A和B,A有100個consumer,B有200個consumer,那麼在RequestQueue中來自B的請求可能會多於A,這個時候就存在消費unfair的情況,所以需要隔離不同主題不同消費組以保證fair。除此之外,當consumer消費能力不足,造成broker訊息堆積,這個時候就會導致consumer所在消費組總在消費"老訊息",影響全域性整體的一個消費能力。因為"老訊息"不會存在page cache中,這個時候很可能就會從磁碟load,那麼表現是RequestQueue中來自消費"老訊息"消費組的請求處理時間過長,影響到其他主題消費組的消費,因此這個時候也需要做策略來避免不同消費組的相互影響。所以QMQ就有了actor機制,以消除各個消費組之間因消費能力不同、consumer數量不同而造成的相互影響各自的消費能力。
PullMessageWorker
要了解QMQ的actor模式是如何起作用的,就要先來看看Broker是如何處理訊息拉取請求的。
class PullMessageWorker implements ActorSystem.Processor<PullMessageProcessor.PullEntry> { // 訊息儲存層 private final MessageStoreWrapper store; // actor private final ActorSystem actorSystem; private final ConcurrentMap<String, ConcurrentMap<String, Object>> subscribers; PullMessageWorker(MessageStoreWrapper store, ActorSystem actorSystem) { this.store = store; this.actorSystem = actorSystem; this.subscribers = new ConcurrentHashMap<>(); } void pull(PullMessageProcessor.PullEntry pullEntry) { // subject+group作actor排程粒度 final String actorPath = ConsumerGroupUtils.buildConsumerGroupKey(pullEntry.subject, pullEntry.group); // actor排程 actorSystem.dispatch(actorPath, pullEntry, this); } @Override public boolean process(PullMessageProcessor.PullEntry entry , ActorSystem.Actor<PullMessageProcessor.PullEntry> self) { QMon.pullQueueTime(entry.subject, entry.group, entry.pullBegin); //開始處理請求的時候就過期了,那麼就直接不處理了,也不返回任何東西給客戶端,客戶端等待超時 //因為出現這種情況一般是server端排隊嚴重,暫時掛起客戶端可以避免情況惡化 // deadline機制,如果QMQ認為這個消費請求來不及處理,那麼就直接返回,避免雪崩 if (entry.expired()) { QMon.pullExpiredCountInc(entry.subject, entry.group); return true; } if (entry.isInValid()) { QMon.pullInValidCountInc(entry.subject, entry.group); return true; } // 儲存層find訊息 final PullMessageResult pullMessageResult = store.findMessages(entry.pullRequest); if (pullMessageResult == PullMessageResult.FILTER_EMPTY || pullMessageResult.getMessageNum() > 0 || entry.isPullOnce() || entry.isTimeout()) { entry.processMessageResult(pullMessageResult); return true; } // 沒有拉取到訊息,那麼掛起該actor self.suspend(); // timer task,在超時前喚醒actor if (entry.setTimerOnDemand()) { QMon.suspendRequestCountInc(entry.subject, entry.group); // 訂閱訊息,一有訊息來就喚醒該actor subscribe(entry.subject, entry.group); return false; } // 已經超時,那麼即刻喚醒排程 self.resume(); entry.processNoMessageResult(); return true; } // 訂閱 private void subscribe(String subject, String group) { ConcurrentMap<String, Object> map = subscribers.get(subject); if (map == null) { map = new ConcurrentHashMap<>(); map = ObjectUtils.defaultIfNull(subscribers.putIfAbsent(subject, map), map); } map.putIfAbsent(group, HOLDER); } // 有訊息來就喚醒訂閱的subscriber void remindNewMessages(final String subject) { final ConcurrentMap<String, Object> map = this.subscribers.get(subject); if (map == null) return; for (String group : map.keySet()) { map.remove(group); this.actorSystem.resume(ConsumerGroupUtils.buildConsumerGroupKey(subject, group)); QMon.resumeActorCountInc(subject, group); } } } // ActorSystem內定義的處理介面 public interface ActorSystem.Processor<T> { boolean process(T message, Actor<T> self); } 複製程式碼
能看出在這裡起作用的是這個actorSystem。PullMessageWorker繼承了ActorSystem.Processor,所以真正處理拉取請求的是這個接口裡的process方法。請求到達pullMessageWorker,worker將該次請求交給actorSystem排程,排程到這次請求時,worker還有個根據拉取結果做反應的策略,即如果暫時沒有訊息,那麼suspend,以一個timer task定時resume;如果在timer task執行之前有訊息進來,那麼也會即時resume。
ActorSystem
接下來就看看ActorSystem裡邊是如何做的公平排程
。
public class ActorSystem { // 內部維護的是一個ConcurrentMap,key即PullMessageWorker裡的subject+group private final ConcurrentMap<String, Actor> actors; // 執行actor的executor private final ThreadPoolExecutor executor; private final AtomicInteger actorsCount; private final String name; public ActorSystem(String name) { this(name, Runtime.getRuntime().availableProcessors() * 4, true); } public ActorSystem(String name, int threads, boolean fair) { this.name = name; this.actorsCount = new AtomicInteger(); // 這裡根據fair引數初始化一個優先順序佇列作為executor的引數,處理關於前言裡說的"老訊息"的情況 BlockingQueue<Runnable> queue = fair ? new PriorityBlockingQueue<>() : new LinkedBlockingQueue<>(); this.executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.MINUTES, queue, new NamedThreadFactory("actor-sys-" + name)); this.actors = Maps.newConcurrentMap(); QMon.dispatchersGauge(name, actorsCount::doubleValue); QMon.actorSystemQueueGauge(name, () -> (double) executor.getQueue().size()); } } 複製程式碼
可以看到,用一個執行緒池處理actor的排程執行,這個執行緒池裡的佇列是一個優先順序佇列。優先順序佇列儲存的元素是Actor。關於Actor我們稍後來看,先來看一下ActorSystem的處理排程流程。
// PullMessageWorker呼叫的就是這個方法 public <E> void dispatch(String actorPath, E msg, Processor<E> processor) { // 取得actor Actor<E> actor = createOrGet(actorPath, processor); // 在後文Actor定義裡能看到,actor內部維護一個queue,這裡actor僅僅是offer(msg) actor.dispatch(msg); // 執行排程 schedule(actor, true); } // 無訊息時,則會掛起 public void suspend(String actorPath) { Actor actor = actors.get(actorPath); if (actor == null) return; actor.suspend(); } // 有訊息則恢復,可以理解成執行緒的"就緒狀態" public void resume(String actorPath) { Actor actor = actors.get(actorPath); if (actor == null) return; actor.resume(); // 立即排程,可以留意一下那個false // 當actor是"可排程狀態"時,這個actor是否能排程是取決於actor的queue是否有訊息 schedule(actor, false); } private <E> Actor<E> createOrGet(String actorPath, Processor<E> processor) { Actor<E> actor = actors.get(actorPath); if (actor != null) return actor; Actor<E> add = new Actor<>(this.name, actorPath, this, processor, DEFAULT_QUEUE_SIZE); Actor<E> old = actors.putIfAbsent(actorPath, add); if (old == null) { LOG.info("create actorSystem: {}", actorPath); actorsCount.incrementAndGet(); return add; } return old; } // 將actor入隊的地方 private <E> boolean schedule(Actor<E> actor, boolean hasMessageHint) { // 如果actor不能排程,則ret false if (!actor.canBeSchedule(hasMessageHint)) return false; // 設定actor為"可排程狀態" if (actor.setAsScheduled()) { // 提交時間,和actor執行總耗時共同決定在佇列裡的優先順序 actor.submitTs = System.currentTimeMillis(); // 入隊,入的是執行緒池裡的優先順序佇列 this.executor.execute(actor); return true; } // actor.setAsScheduled()裡,這裡是actor已經是可排程狀態,那麼沒必要再次入隊 return false; } 複製程式碼
actorSystem維護一個執行緒池,執行緒池佇列具有優先順序,佇列儲存元素是actor。actor的粒度是subject+group。Actor是一個Runnable,且因為是優先順序佇列的儲存元素所以需繼承Comparable介面(佇列並沒有傳Comparator引數),並且actor有四種狀態,初始狀態、可排程狀態、掛起狀態、排程狀態(這個狀態其實不存在,但是暫且這麼叫以幫助理解)。
接下來看看Actor這個類:
public static class Actor<E> implements Runnable, Comparable<Actor> { // 初始狀態 private static final int Open = 0; // 可排程狀態 private static final int Scheduled = 2; // 掩碼,二進位制表示:11 與Open和Scheduled作&運算 // shouldScheduleMask¤tStatus != Open 則為不可置為排程狀態(當currentStatus為掛起狀態或排程狀態) private static final int shouldScheduleMask = 3; private static final int shouldNotProcessMask = ~2; // 掛起狀態 private static final int suspendUnit = 4; //每個actor至少執行的時間片 private static final int QUOTA = 5; // status屬性記憶體偏移量,用Unsafe操作 private static long statusOffset; static { try { statusOffset = Unsafe.instance.objectFieldOffset(Actor.class.getDeclaredField("status")); } catch (Throwable t) { throw new ExceptionInInitializerError(t); } } final String systemName; final ActorSystem actorSystem; // actor內部維護的queue,後文簡單分析下 final BoundedNodeQueue<E> queue; // ActorSystem內部定義介面,PullMessageWorker實現的就是這個介面,用於真正業務邏輯處理的地方 final Processor<E> processor; private final String name; // 一個actor執行總耗時 private long total; // actor執行提交時間,即actor入隊時間 private volatile long submitTs; //通過Unsafe操作 private volatile int status; Actor(String systemName, String name, ActorSystem actorSystem, Processor<E> processor, final int queueSize) { this.systemName = systemName; this.name = name; this.actorSystem = actorSystem; this.processor = processor; this.queue = new BoundedNodeQueue<>(queueSize); QMon.actorQueueGauge(systemName, name, () -> (double) queue.count()); } // 入隊,是actor內部的佇列 boolean dispatch(E message) { return queue.add(message); } // actor執行的地方 @Override public void run() { long start = System.currentTimeMillis(); String old = Thread.currentThread().getName(); try { Thread.currentThread().setName(systemName + "-" + name); if (shouldProcessMessage()) { processMessages(); } } finally { long duration = System.currentTimeMillis() - start; // 每次actor執行的耗時累加到total total += duration; QMon.actorProcessTime(name, duration); Thread.currentThread().setName(old); // 設定為"空閒狀態",即初始狀態 (currentStatus & ~Scheduled) setAsIdle(); // 進行下一次排程 this.actorSystem.schedule(this, false); } } void processMessages() { long deadline = System.currentTimeMillis() + QUOTA; while (true) { E message = queue.peek(); if (message == null) return; // 處理業務邏輯 boolean process = processor.process(message, this); // 失敗,該message不會出隊,等待下一次排程 // 如pullMessageWorker中沒有訊息時將actor掛起 if (!process) return; // 出隊 queue.pollNode(); // 每個actor只有QUOTA個時間片的執行時間 if (System.currentTimeMillis() >= deadline) return; } } final boolean shouldProcessMessage() { // 能夠真正執行業務邏輯的判斷 // 一種場景是,針對掛起狀態,由於沒有拉取到訊息該actor置為掛起狀態 // 自然就沒有搶佔時間片的必要了 return (currentStatus() & shouldNotProcessMask) == 0; } // 能否排程 private boolean canBeSchedule(boolean hasMessageHint) { int s = currentStatus(); if (s == Open || s == Scheduled) return hasMessageHint || !queue.isEmpty(); return false; } public final boolean resume() { while (true) { int s = currentStatus(); int next = s < suspendUnit ? s : s - suspendUnit; if (updateStatus(s, next)) return next < suspendUnit; } } public final void suspend() { while (true) { int s = currentStatus(); if (updateStatus(s, s + suspendUnit)) return; } } final boolean setAsScheduled() { while (true) { int s = currentStatus(); // currentStatus為非Open狀態,則ret false if ((s & shouldScheduleMask) != Open) return false; // 更新actor狀態為排程狀態 if (updateStatus(s, s | Scheduled)) return true; } } final void setAsIdle() { while (true) { int s = currentStatus(); // 更新actor狀態位不可排程狀態,(這裡可以理解為更新為初始狀態Open) if (updateStatus(s, s & ~Scheduled)) return; } } final int currentStatus() { // 根據status在記憶體中的偏移量取得status return Unsafe.instance.getIntVolatile(this, statusOffset); } private boolean updateStatus(int oldStatus, int newStatus) { // Unsafe 原子操作,處理status的輪轉變更 return Unsafe.instance.compareAndSwapInt(this, statusOffset, oldStatus, newStatus); } // 決定actor在優先順序佇列裡的優先順序的地方 // 先看總耗時,以達到動態限速,保證執行"慢"的請求(已經堆積的訊息拉取請求)在後執行 // 其次看提交時間,先提交的actor先執行 @Override public int compareTo(Actor o) { int result = Long.compare(total, o.total); return result == 0 ? Long.compare(submitTs, o.submitTs) : result; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Actor<?> actor = (Actor<?>) o; return Objects.equals(systemName, actor.systemName) && Objects.equals(name, actor.name); } @Override public int hashCode() { return Objects.hash(systemName, name); } } 複製程式碼
Actor實現了Comparable,在優先順序佇列裡優先順序是Actor裡的total和submitTs共同決定的。total是actor執行總耗時,submitTs是排程時間。那麼對於處理較慢的actor自然就會在佇列裡相對"尾部"位置,這時就做到了根據actor的執行耗時的一個動態限速。Actor利用Unsafe機制來控制各個狀態的輪轉原子性更新的,且每個actor執行時間可以簡單理解為5個時間片。
其實工作進行到這裡就可以結束了,但是抱著研究的態度,不妨接著往下看看。
Actor內部維護一個Queue,這個Queue是自定義的,是一個Lock-free bounded non-blocking multiple-producer single-consumer queue。JDK裡的QUEUE多數都是用鎖控制,不用鎖,猜測也應該是用Unsafe 原子操作實現。那麼來看看吧:
private static class BoundedNodeQueue<T> { // 頭結點、尾節點在記憶體中的偏移量 private final static long enqOffset, deqOffset; static { try { enqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_enqDoNotCallMeDirectly")); deqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_deqDoNotCallMeDirectly")); } catch (Throwable t) { throw new ExceptionInInitializerError(t); } } private final int capacity; // 尾節點,通過enqOffset操作 private volatile Node<T> _enqDoNotCallMeDirectly; // 頭結點,通過deqOffset操作 private volatile Node<T> _deqDoNotCallMeDirectly; protected BoundedNodeQueue(final int capacity) { if (capacity < 0) throw new IllegalArgumentException("AbstractBoundedNodeQueue.capacity must be >= 0"); this.capacity = capacity; final Node<T> n = new Node<T>(); setDeq(n); setEnq(n); } // 獲取尾節點 private Node<T> getEnq() { // getObjectVolatile這種方式保證拿到的都是最新資料 return (Node<T>) Unsafe.instance.getObjectVolatile(this, enqOffset); } // 設定尾節點,僅在初始化時用 private void setEnq(Node<T> n) { Unsafe.instance.putObjectVolatile(this, enqOffset, n); } private boolean casEnq(Node<T> old, Node<T> nju) { // cas,迴圈設定,直到成功 return Unsafe.instance.compareAndSwapObject(this, enqOffset, old, nju); } // 獲取頭結點 private Node<T> getDeq() { return (Node<T>) Unsafe.instance.getObjectVolatile(this, deqOffset); } // 僅在初始化時用 private void setDeq(Node<T> n) { Unsafe.instance.putObjectVolatile(this, deqOffset, n); } // cas設定頭結點 private boolean casDeq(Node<T> old, Node<T> nju) { return Unsafe.instance.compareAndSwapObject(this, deqOffset, old, nju); } // 與其叫count,不如喚作index,但是是否應該考慮溢位的情況? public final int count() { final Node<T> lastNode = getEnq(); final int lastNodeCount = lastNode.count; return lastNodeCount - getDeq().count; } /** * @return the maximum capacity of this queue */ public final int capacity() { return capacity; } public final boolean add(final T value) { for (Node<T> n = null; ; ) { final Node<T> lastNode = getEnq(); final int lastNodeCount = lastNode.count; if (lastNodeCount - getDeq().count < capacity) { // Trade a branch for avoiding to create a new node if full, // and to avoid creating multiple nodes on write conflict á la Be Kind to Your GC if (n == null) { n = new Node<T>(); n.value = value; } n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq() // Try to putPullLogs the node to the end, if we fail we continue loopin' // 相當於 // enq -> next = new Node(value); enq = neq -> next; if (casEnq(lastNode, n)) { // 注意一下這個Node.setNext方法 lastNode.setNext(n); return true; } } else return false; // Over capacity—couldn't add the node } } public final boolean isEmpty() { // enq == deq 即為empty return getEnq() == getDeq(); } /** * Removes the first element of this queue if any * * @return the value of the first element of the queue, null if empty */ public final T poll() { final Node<T> n = pollNode(); return (n != null) ? n.value : null; } public final T peek() { Node<T> n = peekNode(); return (n != null) ? n.value : null; } protected final Node<T> peekNode() { for (; ; ) { final Node<T> deq = getDeq(); final Node<T> next = deq.next(); if (next != null || getEnq() == deq) return next; } } /** * Removes the first element of this queue if any * * @return the `Node` of the first element of the queue, null if empty */ public final Node<T> pollNode() { for (; ; ) { final Node<T> deq = getDeq(); final Node<T> next = deq.next(); if (next != null) { if (casDeq(deq, next)) { deq.value = next.value; deq.setNext(null); next.value = null; return deq; } // else we retry (concurrent consumers) // 比較套路的cas操作,就不多說了 } else if (getEnq() == deq) return null; // If we got a null and head meets tail, we are empty } } public static class Node<T> { private final static long nextOffset; static { try { nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly")); } catch (Throwable t) { throw new ExceptionInInitializerError(t); } } protected T value; protected int count; // 也是利用偏移量操作 private volatile Node<T> _nextDoNotCallMeDirectly; public final Node<T> next() { return (Node<T>) Unsafe.instance.getObjectVolatile(this, nextOffset); } protected final void setNext(final Node<T> newNext) { // 這裡有點講究,下面分析下 Unsafe.instance.putOrderedObject(this, nextOffset, newNext); } } } 複製程式碼
如上程式碼,是通過屬性在記憶體的偏移量,結合cas原子操作來進行更新賦值等操作,以此來實現lock-free,這是比較常規的套路。值得一說的是Node裡的setNext方法,這個方法的呼叫是在cas節點後,對"上一位置"的next節點進行賦值。而這個方法使用的是Unsafe.instance.putOrderedObject,要說這個putOrderedObject,就不得不說MESI,快取一致性協議。如volatile,當進行寫操作時,它是依靠storeload barrier來實現其他執行緒對此的可見性。而putOrderedObject也是依靠記憶體屏障,只不過是storestore barrier。storestore是比storeload快速的一種記憶體屏障。在硬體層面,記憶體屏障分兩種:Load-Barrier和Store-Barrier。Load-Barrier是讓快取記憶體中的資料失效,強制重新從主記憶體載入資料;Store-Barrier是讓寫入快取記憶體的資料更新寫入主記憶體,對其他執行緒可見。而java層面的四種記憶體屏障無非是硬體層面的兩種記憶體屏障的組合而已。那麼可見,storestore barrier自然比storeload barrier快速。那麼有一個問題,我們可不可以在這裡也用cas操作呢?答案是可以,但沒必要。你可以想想這裡為什麼沒必要。