1. 程式人生 > >訊息佇列ActiveMQ的使用

訊息佇列ActiveMQ的使用

-----------------ActiveMQ-----------------
一、ActiveMQ核心概念
1、ActiveMQ是訊息佇列技術,為解決高併發問題而生!
2、ActiveMQ生產者消費者模型(生產者和消費者可以跨平臺、跨系統)
有中間平臺
3、ActiveMQ支援兩種訊息傳輸方式
1)Queue,佇列模式,生產者生產了一個訊息,只能由一個消費者進行消費
2)Topic,釋出/訂閱模式,生產者生產了一個訊息,可以由多個消費者進行消費
參考"ActiveMQ補充.docx"

二、ActiveMQ訊息佇列安裝使用
1、官網:http://activemq.apache.org/
2、安裝成功訪問地址:http://localhost:8161使用者名稱和密碼都是admin

三、ActiveMQ整合Spring實現生產者
1、activeMQ_spring工程的pom.xml匯入座標
spring-context
spring-test
junit
activemq-all
spring-jms
2、src/main/resources/applicationContext-mq.xml配置生產者
<!-- 掃描包 -->
<context:component-scan base-package="cn.itcast.activemq" />

<!-- ActiveMQ 連線工廠 -->
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<!-- 如果連線網路:tcp://ip:61616;未連線網路:tcp://localhost:61616 以及使用者名稱,密碼-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />


<!-- Spring Caching連線工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session快取數量 -->
<property name="sessionCacheSize" value="100" />
</bean>
    
<!-- Spring JmsTemplate 的訊息生產者 start-->

<!-- 定義JmsTemplate的Queue型別 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(釋出/訂閱),即佇列模式   queue -->
<property name="pubSubDomain" value="false" />
</bean>


<!-- 定義JmsTemplate的Topic型別 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(釋出/訂閱)  topic -->
<property name="pubSubDomain" value="true" />
</bean>


<!--Spring JmsTemplate 的訊息生產者 end-->
3、寫生產者類
1)基於Topic模式的生產者類,釋出/訂閱模式
@Service
public class TopicSender {
// 注入jmsTemplate
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;


public void send(String topicName, final String message) {
jmsTemplate.send(topicName, new MessageCreator() {


public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
2)基於Queue模式的生產者類,佇列模式
@Service
public class QueueSender {
// 注入jmsTemplate
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;


public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
4、src/test/java/cn/itcast/activemq/producer/test下寫測試類ProducerTest
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq.xml")
public class ProducerTest {
@Autowired
private QueueSender queueSender;


@Autowired
private TopicSender topicSender;


@Test
public void testSendMessage() {
for (int i = 0; i < 200; i++) {
queueSender.send("spring_queue", "queue你好");
topicSender.send("spring_topic", "topic你好");
}
}
}

四、ActiveMQ整合Spring實現消費者
1、配置/src/main/resources/applicationContext-mq-consumer.xml
<!-- 掃描包 -->
<context:component-scan base-package="cn.itcast.activemq.consumer" />

<!-- ActiveMQ 連線工廠 -->
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<!-- 如果連線網路:tcp://ip:61616;未連線網路:tcp://localhost:61616 以及使用者名稱,密碼-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />


<!-- Spring Caching連線工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session快取數量 -->
<property name="sessionCacheSize" value="100" />
</bean>
    
<!-- 訊息消費者 start-->
<!-- 定義Queue監聽器 -->
<jms:listener-container destination-type="queue" container-type="default" 
connection-factory="connectionFactory" acknowledge="auto">
<!-- 預設註冊bean名稱,應該是類名首字母小寫  -->
<jms:listener destination="spring_queue" ref="abc1"/>
<jms:listener destination="spring_queue" ref="queueConsumer2"/>
<jms:listener destination="spring_queue" ref="queueConsumer3"/>
</jms:listener-container>

<!-- 定義Topic監聽器 -->
<jms:listener-container destination-type="topic" container-type="default" 
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="spring_topic" ref="topicConsumer1"/>
<jms:listener destination="spring_topic" ref="topicConsumer2"/>
</jms:listener-container>
<!-- 訊息消費者 end -->


2、寫消費者類
1)對應Queue模式的消費者類QueueConsumer1、QueueConsumer2
@Service("abc1") 
//這裡與<jms:listener destination="spring_queue" ref="abc1"/>的ref對應,預設名為類名首字母小寫,即ref="queueConsumer1"
public class QueueConsumer1 implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out
.println("消費者QueueConsumer1獲取訊息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2)對應Topic模式的消費者類TopicConsumer1、TopicConsumer2
@Service
public class TopicConsumer1 implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out
.println("消費者TopicConsumer1獲取訊息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3、src/test/java/cn/itcast/activemq/producer/test下寫測試類ConsumerTest
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml")
public class ConsumerTest {


@Test
public void testConsumerMessage() {
while (true) {
// junit退出,防止程序死掉
}
}
}

五、生產者消費者開發注意點(重點)
1)中間平臺brokerURL,生產者和消費者配置要一樣,連線同一個平臺
2)destination生產者和消費者配置要一樣
3)Queue的生產者與Queue的消費者對應、Topic的生產者與Topic的消費者對應
4)生產者和消費者配置對比
生產者
a.包掃描,掃描到生產者那個類
b.mq連線工廠
c.spring封裝mq連線工廠
d.提供模板,關聯spring封裝mq連線工廠
<property name="pubSubDomain" value="false" /> false佇列模式queue  true訂閱模式topic

消費者
a.包掃描,掃描到消費者那個類
b.mq連線工廠
c.spring封裝mq連線工廠
d.配置監聽 關聯spring封裝mq連線工廠 
<jms:listener destination="與生產者發起的訊息名字對應" ref="與消費者類上的@Service的名字對應"/>

六、基於ActiveMQ完成發簡訊
在common_parent的pom.xml匯入座標activemq-all
1)bos_fore作為生產者,發訊息
a.配置生產者bos_fore/src/main/resources/applicationContext-mq.xml
b.注入生產者模板,Queue模式
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
c.傳送訊息
jmsTemplate.send("bos_sms111", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("telephone2", model.getTelephone());
mapMessage.setString("username2", model.getUsername());
mapMessage.setString("randomCode2",randomCode);
return mapMessage;
}
});
2)新建war工程bos_sms作為消費者,收訊息
a.建war工程,繼承於common_parent
b.配置web.xml,Spring配置檔案位置及Spring的核心監聽器
c.配置bos_sms/src/main/resources/applicationContext.xml
注意包掃描
<context:component-scan base-package="cn.itcast.bos.mq" />
注意brokerURL和生產者一致
注意destination="bos_sms111"與生產者一致
d.編寫消費者類
//與消費者配置的<jms:listener destination="bos_sms111" ref="smsConsumer123"/>的ref對應
@Service("smsConsumer123") //@Service不寫後面的名字,則預設ref="smsConsumer"
public class SmsConsumer implements MessageListener {


@Override
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
SendSmsResponse response = null;
try {
System.out.println("消費者:" + mapMessage.getString("telephone2")
+ " " + mapMessage.getString("username2") + " "
+ mapMessage.getString("randomCode2"));
response = AliSmsUtils.sendSms(
mapMessage.getString("telephone2"),
mapMessage.getString("username2"),
mapMessage.getString("randomCode2"));
if (response.getCode() != null && response.getCode().equals("OK")) {
System.out.println("傳送簡訊成功!");
}
} catch (Exception e) {
// 傳送失敗
throw new RuntimeException("簡訊傳送失敗="+response);
}
}
}