1. 程式人生 > >RabbitMQ客戶端原始碼分析(五)之ConsumerWorkSerivce與WorkPool

RabbitMQ客戶端原始碼分析(五)之ConsumerWorkSerivce與WorkPool

RabbitMQ-java-client版本

  1. com.rabbitmq:amqp-client:4.3.0
  2. RabbitMQ版本宣告: 3.6.15

WorkPool

  1. WorkPool可以認為是一個任務池,儲存client(在這裡實際型別其實就是Channel)與具體處理任務的關係

  2. 成員變數,SetQueueVariableLinkedBlockingQueue在下面詳細說明

        //預設最大佇列長度
        private static final int MAX_QUEUE_LENGTH = 1000;
    
        /**就緒的clients集合,SetQueue本質是一個LinkedList+Set*/
    private final SetQueue<K> ready = new SetQueue<K>(); /** 正在處理的client集合*/ private final Set<K> inProgress = new HashSet<K>(); /** 儲存註冊的Channel與處理的任務佇列 */ private final Map<K, VariableLinkedBlockingQueue<W>> pool = new HashMap<K, VariableLinkedBlockingQueue<
    W>
    >(); /** 儲存限制被移除的key的集合,如果不為空,不限制佇列大小 */ private final Set<K> unlimited = new HashSet<K>();
  3. registerKey(K key): Channel所對應的任務佇列長度取決於是否限制佇列長度,如果限制佇列長度最大MAX_QUEUE_LENGTH(1000),不限制就是Integer.MAX_VALUE

        public void registerKey(K key) {
            synchronized (this) {
                if
    (!this.pool.containsKey(key)) { int initialCapacity = unlimited.isEmpty() ? MAX_QUEUE_LENGTH : Integer.MAX_VALUE; this.pool.put(key, new VariableLinkedBlockingQueue<W>(initialCapacity)); } } }
  4. nextWorkBlock(Collection<W> to, int size):返回下一個準備就緒的Channel,並從該Channel對應的任務佇列裡取出size個任務放在傳入的引數Collection中。

       public K nextWorkBlock(Collection<W> to, int size) {
            synchronized (this) {
                //從就緒佇列中取出一個Channel
                K nextKey = readyToInProgress();
                if (nextKey != null) {
                    //獲取Channel對應的任務佇列
                    VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey);
                    //連續從queue取中size個元素,並將元素儲存到to集合中
                    drainTo(queue, to, size);
                }
                return nextKey;
            }
        }
        private K readyToInProgress() {
            //從SetQueue的佇列頭獲取一個Channel,並從就緒集合轉移到處理集合
            K key = this.ready.poll();
            if (key != null) {
                this.inProgress.add(key);
            }
            return key;
        }
        
        private int drainTo(VariableLinkedBlockingQueue<W> deList, Collection<W> c, int maxElements) {
            int n = 0;
            while (n < maxElements) {
                W first = deList.poll();
                if (first == null)
                    break;
                c.add(first);
                ++n;
            }
            return n;
        }
    
  5. addWorkItem(K key, W item): 為特定的Channel新增新的任務,如果Channel處於休眠狀態(不是就緒狀態,不是處理狀態,不是註冊狀態),就將Channel標記為準備就緒狀態

        public boolean addWorkItem(K key, W item) {
            VariableLinkedBlockingQueue<W> queue;
            synchronized (this) {
                queue = this.pool.get(key);
            }
            // The put operation may block. We need to make sure we are not holding the lock while that happens.
            if (queue != null) {
                try {
                    queue.put(item);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
    
                synchronized (this) {
                    //如果Channel處於休眠狀態,轉換為準備就緒狀態
                    if (isDormant(key)) {
                        dormantToReady(key);
                        return true;
                    }
                }
            }
            return false;
        }
    
    
        private boolean isInProgress(K key){ return this.inProgress.contains(key); }
        private boolean isReady(K key){ return this.ready.contains(key); }
        private boolean isRegistered(K key) { return this.pool.containsKey(key); }
        /**
         * 是否休眠狀態
         * @param key
         * @return
         */
        private boolean isDormant(K key){ return !isInProgress(key) && !isReady(key) && isRegistered(key); }
    
        //狀態流轉方法,全部假設key已經註冊過
        private void inProgressToReady(K key){ this.inProgress.remove(key); this.ready.addIfNotPresent(key); }
        private void inProgressToDormant(K key){ this.inProgress.remove(key); }
        private void dormantToReady(K key){ this.ready.addIfNotPresent(key); }
        
    
  6. finishWorkBlock(K key): 設定客戶端不是處理狀態(inProgress)。忽略未知客戶端(並返回false)。

        public boolean finishWorkBlock(K key) {
            synchronized (this) {
                if (!this.isRegistered(key))
                    return false;
                if (!this.inProgress.contains(key)) {
                    throw new IllegalStateException("Client " + key + " not in progress");
                }
    
                if (moreWorkItems(key)) {
                    inProgressToReady(key);
                    return true;
                } else {
                    inProgressToDormant(key);
                    return false;
                }
            }
        }
    
        private boolean moreWorkItems(K key) {
            VariableLinkedBlockingQueue<W> leList = this.pool.get(key);
            return leList != null && !leList.isEmpty();
        }
    
    

VariableLinkedBlockingQueue

  1. 這是一個LinkedBlockingQueue類的克隆,增加了一個setCapacity(int)方法,允許在使用的過程中更改容量

SetQueue

  1. SetQueue是一個Set佇列,即佇列中的元素只能出現一次,本質上佇列還是通過LinkedList實現,只不過同時通過HashSet判斷是否已經有元素。如果有則不再新增元素到佇列中。

  2. addIfNotPresent(T item): 如果不存在就新增到佇列尾部

        public boolean addIfNotPresent(T item) {
            if (this.members.contains(item)) {
                return false;
            }
            this.members.add(item);
            //新增到linkedList佇列的尾部
            this.queue.offer(item);
            return true;
        }
    
  3. poll():獲取並移除佇列頭部元素

       public T poll() {
           T item =  this.queue.poll();
           if (item != null) {
               this.members.remove(item);
           }
           return item;
       }
    

ConsumerWorkService

  1. ConsumerWorkService主要是用於處理Channel的任務,大部分方法都是委託WorkPool來進行處理

  2. 成員變數與建構函式

        private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
        //預設執行緒數 對於IO密集的程式來說2倍是不是有點少??
        private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
        private final ExecutorService executor;
        private final boolean privateExecutor;
        //Channel與具體任務的關係
        private final WorkPool<Channel, Runnable> workPool;
        private final int shutdownTimeout;
    
        public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
            //如果沒有自定義執行緒就使用預設的執行緒池
            this.privateExecutor = (executor == null);
            this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
                                               : executor;
            this.workPool = new WorkPool<Channel, Runnable>();
            this.shutdownTimeout = shutdownTimeout;
        }
    
    
    
    
  3. addWork:給特定的Channel新增任務

        public void addWork(Channel channel, Runnable runnable) {
            if (this.workPool.addWorkItem(channel, runnable)) {
                this.executor.execute(new WorkPoolRunnable());
            }
        }
        private final class WorkPoolRunnable implements Runnable {
    
            @Override
            public void run() {
                //一個執行緒每次執行16個任務
                int size = MAX_RUNNABLE_BLOCK_SIZE;
                List<Runnable> block = new ArrayList<Runnable>(size);
                try {
                    Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
                    if (key == null) return; // nothing ready to run
                    try {
                        for (Runnable runnable : block) {
                            runnable.run();
                        }
                    } finally {
                        //任務執行完成後清理,如果還有任務在佇列裡,則繼續使用執行緒池執行(遞迴)
                        if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
                            ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
                        }
                    }
                } catch (RuntimeException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    

ConsumerDispatcher

  1. Consumer分發器,每個Channel都有一個分發器,分發通知事件傳送給內部管理的WorkPoolConsumerWorkService