1. 程式人生 > >使用Spring整合JMS連線ActiveMQ

使用Spring整合JMS連線ActiveMQ

核心類介紹

ConnectionFactory(用於管理連線的工廠)

一個spring為我們提供的連線池
JmsTemplate每次傳送訊息都會重新建立連線、會話和producer
spring中提供了SingleConnectionFactory和CachingConnectionFactory

JmsTemplate(用於傳送和接收訊息的模板類)

是spring提供的,只需向spring容器類註冊這個類就可以使用JmsTemplate方便的操作jms
JmsTemplate是執行緒安全的,可以在整個應用範圍使用

MessageListener(訊息監聽器)

實現一個onMessage方法,該方法只接收一個Message引數

例項程式碼(生產者)

建立一個生產者介面:ProducerService.java

public interface ProducerService {
    void sendMessage(String message);
}

具體實現類:ProducerServiceImpl.java

public class ProducerServiceImpl implements ProducerService {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name = "topicDestination")//資源方式注入,name作區分。因為可能會存在多個destination
    private Destination destination;

    @Override
    public void sendMessage(final String message) {
        //使用jmsTemplate傳送訊息
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage=session.createTextMessage(message);
                return textMessage;
            }
        });
        System.out.println("傳送訊息:"+message);
    }
}

啟動類:AppProducer.java

public class AppProducer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context=new ClassPathXmlApplicationContext("producer.xml");
        ProducerService producerService=(ProducerService) context.getBean(ProducerService.class);
        for(int i=0;i<50;i++){
            producerService.sendMessage("fut"+i);
        }
        context.close();
    }
}

例項程式碼(消費者)

建立一個監聽器類:ConsumerMessageListener.java

public class ConsumerMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage=(TextMessage)message;
        try {
            System.out.println("接收訊息:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

啟動類:AppConsumer.java

public class AppConsumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context=new ClassPathXmlApplicationContext("consumer.xml");
    }
}

資原始檔配置

spring資原始檔配置(生產者和消費者公共資源):common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
               http://www.springframework.org/schema/context
               http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <context:annotation-config/>

    <!--ActiveMQ為我們提供的ConnectionFactory-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.63.137:61616"/>
    </bean>
    <!--spring jms為我們提供的連線池-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>

    <!--佇列模式目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue"/>
    </bean>

    <!--主題模式目的地-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"/>
    </bean>
</beans>

spring資原始檔配置(生產者):producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <import resource="common.xml"/>

    <!--配置JmsTemplate,用於傳送訊息-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <bean class="com.ft.jms.activemq.producer.ProducerServiceImpl"/>
</beans>

spring資原始檔配置(消費者):consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <!--匯入公共配置-->
    <import resource="common.xml"/>

    <!--配置訊息監聽器-->
    <bean id="consumerMessageListener" class="com.ft.jms.activemq.consumer.ConsumerMessageListener"/>
    <!--配置訊息監聽容器-->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>

</beans>

總結

以上程式碼是主題模式的訊息釋出/訂閱,如果要切換到佇列模式,只需將topic那塊改為queue即可。