1. 程式人生 > >SpringBoot2.0源碼分析(二):整合ActiveMQ分析

SpringBoot2.0源碼分析(二):整合ActiveMQ分析

reg 同時 set rip ttr subscript 進行 msg found

SpringBoot具體整合ActiveMQ可參考:SpringBoot2.0應用(二):SpringBoot2.0整合ActiveMQ

ActiveMQ自動註入

當項目中存在javax.jms.Messageorg.springframework.jms.core.JmsTemplate著兩個類時,SpringBoot將ActiveMQ需要使用到的對象註冊為Bean,供項目註入使用。一起看一下JmsAutoConfiguration類。

@Configuration
@ConditionalOnClass({ Message.class, JmsTemplate.class })
@ConditionalOnBean(ConnectionFactory.class)
@EnableConfigurationProperties(JmsProperties.class)
@Import(JmsAnnotationDrivenConfiguration.class)
public class JmsAutoConfiguration {

    @Configuration
    protected static class JmsTemplateConfiguration {
        ......
        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
            PropertyMapper map = PropertyMapper.get();
            JmsTemplate template = new JmsTemplate(connectionFactory);
            template.setPubSubDomain(this.properties.isPubSubDomain());
            map.from(this.destinationResolver::getIfUnique).whenNonNull()
                    .to(template::setDestinationResolver);
            map.from(this.messageConverter::getIfUnique).whenNonNull()
                    .to(template::setMessageConverter);
            mapTemplateProperties(this.properties.getTemplate(), template);
            return template;
        }
            ......
    }

    @Configuration
    @ConditionalOnClass(JmsMessagingTemplate.class)
    @Import(JmsTemplateConfiguration.class)
    protected static class MessagingTemplateConfiguration {

        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnSingleCandidate(JmsTemplate.class)
        public JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) {
            return new JmsMessagingTemplate(jmsTemplate);
        }

    }

}

RabbitAutoConfiguration主要註入了jmsMessagingTemplatejmsTemplate
RabbitAutoConfiguration同時導入了RabbitAnnotationDrivenConfiguration,註入了jmsListenerContainerFactory

消息發送

以下面的發送為例:

    jmsMessagingTemplate.convertAndSend(this.queue, msg);

這個方法會先對消息進行轉換,預處理,最終通過調用JmsTemplate的doSend實現消息發送的。

    protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
            throws JMSException {
        Assert.notNull(messageCreator, "MessageCreator must not be null");
        MessageProducer producer = createProducer(session, destination);
        try {
            Message message = messageCreator.createMessage(session);
            doSend(producer, message);
            if (session.getTransacted() && isSessionLocallyTransacted(session)) {
                JmsUtils.commitIfNecessary(session);
            }
        }
        finally {
            JmsUtils.closeMessageProducer(producer);
        }
    }

首先創建一個MessageProducer的實例,接著將最初的org.springframework.messaging.Message轉換成javax.jms.Message,再將消息委托給producer進行發送。

消息接收

先看一個消費的事例:

@Component
public class Consumer {
    @JmsListener(destination = "sample.queue")
    public void receiveQueue(String text) {
        System.out.println(text);
    }
}

SpringBoot會去解析@JmsListener,具體實現在JmsListenerAnnotationBeanPostProcessorpostProcessAfterInitialization方法。

    public Object postProcessAfterInitialization(final Object bean, String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
            Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
                        Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, JmsListener.class, JmsListeners.class);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
            }
            else {
                annotatedMethods.forEach((method, listeners) ->
                        listeners.forEach(listener ->
                                processJmsListener(listener, method, bean)));
            }
        }
        return bean;
    }

SpringBoot根據註解找到了使用了@JmsListener註解的方法,當監聽到ActiveMQ收到的消息時,會調用對應的方法。來看一下具體怎麽進行listener和method的綁定的。

    protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
        Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
        MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
        endpoint.setBean(bean);
        endpoint.setMethod(invocableMethod);
        endpoint.setMostSpecificMethod(mostSpecificMethod);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
        endpoint.setBeanFactory(this.beanFactory);
        endpoint.setId(getEndpointId(jmsListener));
        endpoint.setDestination(resolve(jmsListener.destination()));
        if (StringUtils.hasText(jmsListener.selector())) {
            endpoint.setSelector(resolve(jmsListener.selector()));
        }
        if (StringUtils.hasText(jmsListener.subscription())) {
            endpoint.setSubscription(resolve(jmsListener.subscription()));
        }
        if (StringUtils.hasText(jmsListener.concurrency())) {
            endpoint.setConcurrency(resolve(jmsListener.concurrency()));
        }

        JmsListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(jmsListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
                        mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
                        " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        this.registrar.registerEndpoint(endpoint, factory);
    }

先設置endpoint的相關屬性,再獲取jmsListenerContainerFactory,最後將endpoint註冊到jmsListenerContainerFactory


本篇到此結束,如果讀完覺得有收獲的話,歡迎點贊、關註、加公眾號【貳級天災】,查閱更多精彩歷史!!!

技術分享圖片

SpringBoot2.0源碼分析(二):整合ActiveMQ分析