1. 程式人生 > >ActiveMQ 點對點訊息傳送模型

ActiveMQ 點對點訊息傳送模型

這裡寫圖片描述

點對點訊息傳送模型允許JMS客戶端通過佇列這個虛擬通道來同步和非同步傳送、接收訊息。
在點對點訊息傳送模型中,訊息生產者稱為傳送者,訊息消費者稱為接收者

消費者是基於拉取(pull)或基於輪詢(polling)來從佇列中請求訊息,佇列並不會自動地將訊息推送到客戶端

一個訊息有且只能被一個消費者接收,即使有多個消費者同時監聽了佇列

點對點模型支援負載均衡,允許多個消費者監聽同一個佇列,並以此來分配負載

Spring Framework 為JMS提供了內建支援,Spring提供了JMS模板和訊息監聽容器
JMS實現採用ActiveMQ

定義訊息生產者

public interface
ProducerService {
/** * P2P 點對點模式生產者傳送訊息. * * @param destination * @param message */ public void sendMessage(Destination destination, String message); }
@Component("producerService")
public class ProducerServiceImpl implements ProducerService {

    //Spring JMS Template
@Resource(name="jmsTemplate") private JmsTemplate jmsTemplate; /** * P2P 點對點模式生產者傳送訊息. * * @param destination * @param message */ @Override public void sendMessage(Destination destination, final String message) { jmsTemplate.send(destination, new
MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }

設定一個定時任務充當訊息生產者,每隔兩秒傳送一條訊息,訊息內容為當前時間

@Component(value = "poll")
public class Poll {

    // ActiveMQ生產者
    @Resource(name = "producerService")
    private ProducerService producerService;
    // ActiveMQ 預先預約運單號佇列目的地
    @Resource(name = "queueDestination")
    private Destination destination;
    @Scheduled(cron="0/2 * * * * ?")
    public void getTradeIncrement() {
        producerService.sendMessage(destination, "當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    }
}

定義一個消費者監聽佇列:

public class ConsumerMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String num = textMessage.getText();
                System.out.println(num);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

JMS配置檔案:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory" />
        <property name="maxConnections" value="10" />
    </bean>
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />
    </bean>
    <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--這個是佇列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>

    <!-- 訊息監聽器 -->
    <bean id="consumerMessageListener" class="com.dragon.jms.listener.ConsumerMessageListener" />
    <!-- 訊息監聽容器 -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>

</beans>

如果多個消費者同時監聽佇列,那麼訊息將在均衡分佈於每個消費者進行消費,是訊息可靠性的一種方式,稱為佇列消費者叢集(Queue consumer clusters)

public class ConsumerAMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String num = textMessage.getText();
                System.out.println("A " + num);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ConsumerBMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String num = textMessage.getText();
                System.out.println("B " + num);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ConsumerCMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String num = textMessage.getText();
                System.out.println("C " + num);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

此時需為每一個訊息監聽器新增一個訊息監聽容器:

<!-- 訊息監聽器 -->
    <bean id="consumerAMessageListener" class="com.dragon.jms.listener.ConsumerAMessageListener" />
    <bean id="consumerBMessageListener" class="com.dragon.jms.listener.ConsumerBMessageListener" />
    <bean id="consumerCMessageListener" class="com.dragon.jms.listener.ConsumerCMessageListener" />
    <!-- 訊息監聽容器 -->
    <bean id="jmsAContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerAMessageListener" />
    </bean>
    <bean id="jmsBContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerBMessageListener" />
    </bean>
    <bean id="jmsCContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerCMessageListener" />
    </bean>

結果:

C 當前時間:2016-03-04 13:29:08
A 當前時間:2016-03-04 13:29:10
B 當前時間:2016-03-04 13:29:12
C 當前時間:2016-03-04 13:29:14
A 當前時間:2016-03-04 13:29:16
B 當前時間:2016-03-04 13:29:18
C 當前時間:2016-03-04 13:29:20
A 當前時間:2016-03-04 13:29:22
B 當前時間:2016-03-04 13:29:24
C 當前時間:2016-03-04 13:29:26
A 當前時間:2016-03-04 13:29:28
B 當前時間:2016-03-04 13:29:30
C 當前時間:2016-03-04 13:29:32
A 當前時間:2016-03-04 13:29:34
B 當前時間:2016-03-04 13:29:36
C 當前時間:2016-03-04 13:29:38
A 當前時間:2016-03-04 13:29:40
B 當前時間:2016-03-04 13:29:42

訊息均衡的由三個消費者消費,提高系統的負載和可靠性,當某個消費者無法正常工作時,也不影響佇列中訊息的消費