1. 程式人生 > >RabbitMQ客戶端原始碼分析(七)之Channel與ChannelManager

RabbitMQ客戶端原始碼分析(七)之Channel與ChannelManager

RabbitMQ-java-client版本

  1. com.rabbitmq:amqp-client:4.3.0
  2. RabbitMQ版本宣告: 3.6.15

Channel

  1. uml圖

  2. transmit(AMQCommand c):傳輸方法,委託AMQCommand進行傳輸

        public void transmit(AMQCommand c) throws IOException {
            synchronized (_channelMutex) {
                ensureIsOpen();
                quiescingTransmit(c);
            }
    } public void quiescingTransmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { if (c.getMethod().hasContent()) { while (_blockContent) { try { _channelMutex.wait(); } catch
    (InterruptedException ignored) {} // This is to catch a situation when the thread wakes up during // shutdown. Currently, no command that has content is allowed // to send anything in a closing state. ensureIsOpen(); }
    } //呼叫AMQCommand的傳輸方法 c.transmit(this); } }

ChannelManager

  1. ChannelManager:負責Channel的管理,建立Channel、新增新的Channel、獲取Channel、Channel關閉等

  2. 構造方法

        public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory, MetricsCollector metricsCollector) {
            if (channelMax == 0) {
                // The framing encoding only allows for unsigned 16-bit integers
                // for the channel number
                channelMax = (1 << 16) - 1;
            }
            _channelMax = channelMax;
            channelNumberAllocator = new IntAllocator(1, channelMax);
    
            this.workService = workService;
            this.threadFactory = threadFactory;
            this.metricsCollector = metricsCollector;
        }
            
    
  3. 建立Channel,通過IntAllocator分配一個Channel編號(channelNumber),使用Map維護ChannelNumber與Channel的對映關係。

        //維護ChannelNumber與Channel的對映
        private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();
        
        public ChannelN createChannel(AMQConnection connection) throws IOException {
            ChannelN ch;
            synchronized (this.monitor) {
                int channelNumber = channelNumberAllocator.allocate();
                if (channelNumber == -1) {
                    return null;
                } else {
                    ch = addNewChannel(connection, channelNumber);
                }
            }
            ch.open(); // now that it's been safely added
            return ch;
        }    
    
        private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
                if (_channelMap.containsKey(channelNumber)) {
                    // That number's already allocated! Can't do it
                    // This should never happen unless something has gone
                    // badly wrong with our implementation.
                    throw new IllegalStateException("We have attempted to "
                            + "create a channel with a number that is already in "
                            + "use. This should never happen. "
                            + "Please report this as a bug.");
                }
                ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
                _channelMap.put(ch.getChannelNumber(), ch);
                return ch;
            }
        protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
            return new ChannelN(connection, channelNumber, workService, this.metricsCollector);
        }
         
    
    
  4. handleSignal:關閉所有被管理的Channel,在關閉Connection時需要關閉所有Channel,就是呼叫此方法。從原始碼可以看出,使用非同步關閉,主要是為了避免JDK socket wirte死鎖,BIO下socket write沒有寫超時。詳細參考http://rabbitmq.1065348.n5.nabble.com/Long-timeout-if-server-host-becomes-unreachable-td30275.html

        public void handleSignal(final ShutdownSignalException signal) {
            Set<ChannelN> channels;
            synchronized(this.monitor) {
                channels = new HashSet<ChannelN>(_channelMap.values());
            }
    
            for (final ChannelN channel : channels) {
                releaseChannelNumber(channel);
                // async shutdown if possible
                // see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
                Runnable channelShutdownRunnable = new Runnable() {
                    @Override
                    public void run() {
                        channel.processShutdownSignal(signal, true, true);
                    }
                };
                if(this.shutdownExecutor == null) {
                    channelShutdownRunnable.run();
                } else {
                    Future<?> channelShutdownTask = this.shutdownExecutor.submit(channelShutdownRunnable);
                    try {
                        channelShutdownTask.get(channelShutdownTimeout, TimeUnit.MILLISECONDS);
                    } catch (Exception e) {
                        LOGGER.warn("Couldn't properly close channel {} on shutdown after waiting for {} ms", channel.getChannelNumber(), channelShutdownTimeout);
                        channelShutdownTask.cancel(true);
                    }
                }
                shutdownSet.add(channel.getShutdownLatch());
                channel.notifyListeners();
            }
            scheduleShutdownProcessing();
        }