ActiveMQ簡介
ActiveMQ是一種開源的,實現了JMS1.1規範的,面向消息(MOM)的中間件,為應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通信。ActiveMQ使用Apache提供的授權,任何人都可以對其實現代碼進行修改。
ActiveMQ的設計目標是提供標準的,面向消息的,能夠跨越多語言和多系統的應用集成消息通信中間件。ActiveMQ實現了JMS標準並提供了很多附加的特性。這些附加的特性包括,JMX管理(Java Management Extensions,即java管理擴展),主從管理(master/salve,這是集群模式的一種,主要體現在可靠性方面,當主中介(代理)出現故障,那麽從代理會替代主代理的位置,不至於使消息系統癱瘓)、消息組通信(同一組的消息,僅會提交給一個客戶進行處理)、有序消息管理(確保消息能夠按照發送的次序被接受者接收)。
ActiveMQ 支持JMS規範,ActiveMQ完全實現了JMS1.1規範。
JMS規範提供了同步消息和異步消息投遞方式、有且僅有一次投遞語義(指消息的接收者對一條消息必須接收到一次,並且僅有一次)、訂閱消息持久接收等。如果僅使用JMS規範,表明無論您使用的是哪家廠商的消息代理,都不會影響到您的程序。
ActiveMQ整體架構
ActiveMQ主要涉及到5個方面:
- 傳輸協議
- 消息域
- 消息存儲
- Cluster (集群)
- Monitor (監控)
ActiveMQ的安裝配置
- 通過http://activemq.apache.org/download.html 下載:apache-activemq-5.13.3-bin.tar.gz
- 把下載的該文件通過tar –zxvf apache-activemq-5.13.3-bin.tar.gz解壓在當前目錄
- 通過修改$ACTIVEMQ_HOME/conf/activemq.xml文件可以修改其配置
<destinationPolicy> <policyMap> <policyEntries>
我們在此段増加配置如下:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <pendingmessageLimitStrategy> <constantPendingMessageLimitStrategy limit="50000"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue=">" producerFlowControl="false" optimizedDispatch="true" memoryLimit=“2mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
此處,我們使用的是”>”通配符,上述配置為每個隊列、每個Topic配置了一個最大2mb的隊列,並且使用了”optimizedDispatch=true”這個策略,該策略會啟用優化了的消息分發器,直接減少消息來回時的上下文以加快消息分發速度。
找到下面這一段
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
為確保擴展配置既可以處理大量連接也可以處理海量消息隊列,我們可以使用JDBC或更新更快的KahaDB消息存儲。默認情況下ActiveMQ使用KahaDB消息存儲。
ActiveMQ支持三種持久化策略:AMQ、KAHADB、JDBC
- AMQ 它是一種文件存儲形式,具有寫入快、容易恢復的特點,采用這種方式持久化消息會被存儲在一個個文件中,每個文件默認大小為32MB,如果一條消息超過32MB,那麽這個值就需要設大。當一個文件中所有的消息被“消費”掉了,那麽這文件會被置成“刪除”標誌,並且在下一個清除開始時被刪除掉。
- KAHADB,相比AMQ來説,KAHADB速度沒有AMQ快,可是KAHADB具有極強的垂直和橫向擴展能力,恢復時間比AMQ還要短,因此從5.4版後ActiveMQ默認使用KAHADB作為其持久化存儲。而且在作MQ的集群時使用KAHADB可以做到Cluster+Master Slave的這樣的完美高可用集群方案。
- JDBC,即ActiveMQ默認可以支持把數據持久化到DB中,如:mysql、Oracle等。
找到下面這一段
<systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="90" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage>
此處為ActiveMQ的內存配置,從5.10版後ActiveMQ在<memoryUsage>中引入了一個percentOfJvmHeap的配置,該百分比為:
$ACTIVEMQ_HOME/bin/env中配置的JVM堆大小的百分比,如$ACTIVEMQ_HOME/bin/env 中:
# Set jvm memory configuration (minimal/maximum amount of memory) ACTIVEMQ_OPTS_MEMORY="-Xms2048M -Xmx2048M"
那麽此處的percentOfJvmHeap=90即表示:MQ消息隊列一共會用到2048M*0.9的內存。
全部配完後我們可以通過以下命令啟動ActiveMQ
$cd $ACTIVEMQ_HOME $ ./activemq console
這種方式為前臺啟動activemq,用於開發模式便於調試時的信息輸出。
你也可以使用:
以後臺進程的方式啟動activemq。
啟動後在瀏覽器內輸入http://192.168.0.101:8161/admin/ 輸入管理員帳號(默認為admin/admin)即可登錄activemq的console界面

- 啟動後的ActiveMQ的數據位於:$ACTIVEMQ_HOME/data/目錄內
- 啟動後的ActiveMQ運行日誌位於:$ACTIVEMQ_HOME/data/目錄內的activemq.log文件
- 如果需要改ActiveMQ的日誌配置可以通過修改$ACTIVEMQ_HOME/conf/log4j.properties

ActiveMQ與Spring集成
在Spring中建立一個activemq.xml文件,使其內容如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value=http://blog.csdn.net/lifetragedy/article/details/"tcp://192.168.0.101:61616" />
其中:
<property name="alwaysSessionAsync" value=http://blog.csdn.net/lifetragedy/article/details/“true" />
對於一個connection如果只有一個session,該值有效,否則該值無效,默認這個參數的值為true。
<property name="useAsyncSend" value=http://blog.csdn.net/lifetragedy/article/details/"true" />
將該值開啟官方說法是可以取得更高的發送速度(5倍)。
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設置消息隊列的名字 -->
<constructor-arg value=http://blog.csdn.net/lifetragedy/article/details/"ymk.queue?consumer.prefetchSize=100" />
</bean>
在此我們申明了一個隊列,並用它用於後面的實驗代碼。
consumer.prefetchSize則代表我們在此使用“消費者”預分配協議,在消費者內在足夠時可以使這個值更大以獲得更好的吞吐性能。
工程中的pom.xml文件主要內容如下:
。。。。。。 <properties> <activemq_version>5.13.3</activemq_version> </properties> 。。。。。。 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq_version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>${activemq_version}</version> </dependency>
ActiveMQ與Spring集成-發送端代碼
package webpoc; public class AMQSender { public static void sendWithAuto(ApplicationContext context) { ActiveMQConnectionFactory factory = null; Connection conn = null; Destination destination = null; Session session = null; MessageProducer producer = null; try { destination = (Destination) context.getBean("destination"); factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory"); conn = factory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); Message message = session.createTextMessage("...Hello JMS!"); producer.send(message); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.close(); producer = null; } catch (Exception e) { } try { session.close(); session = null; } } catch (Exception e) { } try { conn.stop(); } catch (Exception e) { } try { conn.close(); } catch (Exception e) { } } } public static void main(String[] args) { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml"); sendWithAuto(context); } }
ActiveMQ與Spring集成-接收端代碼
package webpoc; public class TranQConsumer extends Thread implements MessageListener { private Connection conn = null; private Destination destination = null; private Session session = null; public void run() { receive(); } public void receive() { ConnectionFactory factory = null; Connection conn = null; try { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml"); factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = (Destination) context.getBean("destination"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); } catch (Exception e) { e.printStackTrace(); } } public void onMessage(Message message) { try { TextMessage tm = (TextMessage) message; System.out.println("TranQConsumer receive message: " + tm.getText()); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { TranQConsumer tranConsumer = new TranQConsumer(); tranConsumer.start(); } }
ActiveMQ與Spring集成-示例講解
上述例子非常的簡單。
它其實是啟動了一個Message Listener用來監聽ymk.queue中的消息,如果有消息到達,接收端代碼就會把消息“消費”掉。
而發送端代碼也很簡單,它每次向ymk.queue隊列發送一個文本消息。
這邊所謂的MQ消費大家可以這樣理解:
用戶sender向MQ的KAHADB中插入一條數據。
用戶receiver把這條數據select後,再delete,這個select一下後再delete就是一個“消費”動作。
簡單消息與事務型消息
我們可以註意到上述的例子中我們的代碼中有這樣的一段:
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
它代表的是我們的MQ消費端消費模式為“自動”,即一旦消費端從MQ中取到一條消息,這條消息會自動從隊列中刪除。
ActiveMQ是一個分布式消息隊列,它自然支持“事務”型消息,我們可以舉一個例子
系統A和系統B是有一個事務的系統間“服務集成”,我們可以把它想成如下場景:
系統A先會do sth…然後發送消息給系統B,系統B拿到消息後do sth,如果在其中任意一個環節發生了Exception,那麽代表系統A與系統B之間的消息調用這一過程為“失敗”。
失敗要重發,重發的話那原來那條消息必須還能重新拿得到。
此時我們就需要使用事務性的消息了。而事務性的消息是在:
生產端和消費端在創建session時,需要:
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
下面來看一個實際例子。
事務型消息發送端(生產端)
此處其它代碼與普通式消息發送代碼相似,只在以下幾處有不同,首先在取得session時會聲明事務開啟“true”。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
然後在發送時會有一個動作:
producer.send(message); System.out.println("send......" + Thread.currentThread().getId()); session.commit();
相應的在catch(Exception)時需要
catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (Exception ex) { } }
事務型消息接收端(消費端)
在我們的接收端的createSession時也需要把它設為“事務開啟”,此時請註意,生產和消費是在一個事務邊界中的。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
然後在接收時會有一個動作:
try { TextMessage tm = (TextMessage) message; System.out.println("TranQConsumer receive message: " + tm.getText()); session.commit(); } catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (Exception ex) { } }
註意:
- 如果在消費端的onMessage中沒有session.commit(),那麽這條消息可以正常被接收,但不會被消費,換句話説客戶端只要不commit這條消息,這條消息可以被客戶端無限消費下去,直到commit(從MQ所persistent的DB中被刪除)。
- 如果在消費斷遇到任何Exception時session.rollback()了,ActiveMQ會按照默認策略每隔1s會重發一次,重發6次如果還是失敗,則進入ActiveMQ的ActiveMQ.DLQ隊列,重發策略這個值可以設(稍後會給出)。
- 如果在生產端的try{}塊裏發生錯誤,導致回滾(沒有commit),會怎麽樣?消費隊列永遠拿不到這條被rollback的消息,因為這條數據還沒被插入KAHADB中呢。
- 再如果,消費端拿到了消息不commit也不rollback呢?那消費端重啟後會再次拿到這條消息(因為始終取where status=‘未消費’取不到的原因,對吧?)
事務型消息的重發機制
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value=http://blog.csdn.net/lifetragedy/article/details/"tcp://192.168.0.101:61616" />
以上例子申明了對於destination這個隊列的重發機制為間隔100毫秒重發一次。
事務型消息的演示



點對點,應答式消息
所謂點對點應答式消息和事務無關,它主要實現的是如:
生產端:我發給你一個消息了,在你收到並處理後請回復!因為我要根據你的回復內容再做處理
消費端:我收到你的消息了,我處理完了請查收我給你的回復
生產端:收到你的消息,88

點對點,應答式消息核心代碼-配置部分
<!-- 發送消息的目的地(一個隊列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg value=http://blog.csdn.net/lifetragedy/article/details/"ymk.queue?consumer.prefetchSize=100" />

其實也沒啥花頭,就是多了一個隊列(不要打我)
。。。。。。
關鍵在於代碼,代碼,不要只重視表面嗎。。。要看內含的LA。。。
這兩個隊列其實:
一個Request
一個應答(也可以使用temp隊列來做應答隊列)
點對點,應答式消息核心代碼-設計部分
我們設立兩個程序:
- 發送端(生產端)內含一個MessageListener,用來收消費端的返回消息
- 服務端(消費端)內含一個MessageListener,用來收生產端發過來的消息然後再異步返回
而溝通生產端和消費端的這根“消息鏈”是兩個東西:
- JMSCorrelationID
- JMSReplyTo
JMSCorrelationID:
它就是一個隨機不可重復的數字,以String型傳入API,也可以是GUID,它主要是被用來標示MQ 中每一條不同的消息用的一個唯一ID
JMSReplyTo
它就是一個生產端用來接收消費端返回消息的地址
點對點,應答式消息核心代碼-生產端部分代碼
String correlationId = RandomStringUtils.randomNumeric(5); consumer = session.createConsumer(replyDest); message.setJMSReplyTo(replyDest); message.setJMSCorrelationID(correlationId); consumer.setMessageListener(this);
- RandomStringUtils
- replyDest
來看位於客戶端(生產端)的messageListener吧
public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Client接收Server端消息:" + tm.getText()); } catch (Exception e) { e.printStackTrace(); } }
其余部分代碼(沒啥花頭,就是sender裏帶了一個messageListener):
producer.send(message);
點對點,應答式消息核心代碼-生產端所有代碼
package webpoc.mq.dual; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang.RandomStringUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Client implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Client接收Server端消息:" + tm.getText()); } catch (Exception e) { e.printStackTrace(); } } public void start(ApplicationContext context) { ConnectionFactory factory = null; Connection conn = null; Destination destination = null; Destination replyDest = null; Session session = null; MessageProducer producer = null; MessageConsumer consumer = null; try { destination = (Destination) context.getBean("destination"); replyDest = (Destination) context.getBean("replyDestination"); factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); TextMessage message = session.createTextMessage("...Hello JMS!"); String correlationId = RandomStringUtils.randomNumeric(5); consumer = session.createConsumer(replyDest); message.setJMSReplyTo(replyDest); message.setJMSCorrelationID(correlationId); consumer.setMessageListener(this); } catch (Exception e) { String errorMessage = "JMSException while queueing HTTP JMS Message"; e.printStackTrace(); } } public void send(ApplicationContext context) { ConnectionFactory factory = null; Connection conn = null; Destination destination = null; Destination replyDest = null; Session session = null; MessageProducer producer = null; try { destination = (Destination) context.getBean("destination"); replyDest = (Destination) context.getBean("replyDestination"); factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); TextMessage message = session.createTextMessage("...Hello JMS!"); String correlationId = RandomStringUtils.randomNumeric(5); message.setJMSReplyTo(replyDest); message.setJMSCorrelationID(correlationId); producer.send(message); System.out.println("send 1 message"); } catch (Exception e) { String errorMessage = "JMSException while queueing HTTP JMS Message"; e.printStackTrace(); } } public static void main(String[] args) { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml"); //sendWithAuto(context); Client c = new Client(); c.start(context); c.send(context); }
點對點,應答式消息核心代碼-消費端部分代碼
public void onMessage(Message message) { System.out.println("on message"); try { TextMessage response = this.session.createTextMessage(); if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String messageText = txtMsg.getText(); response.setText("服務器收到消息:" + messageText); System.out.println(response.getText()); } response.setJMSCorrelationID(message.getJMSCorrelationID()); producer.send(message.getJMSReplyTo(), response); } catch (Exception e) { e.printStackTrace(); } } }
- 此處的send()方法內有兩個參數,註意其用法
- 然後為這個消費端也加一個messageListener如:
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(replyDest); consumer = session.createConsumer(destination); consumer.setMessageListener(this);
點對點,應答式消息核心代碼-全部代碼
package webpoc.mq.dual; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang.RandomStringUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Server implements MessageListener { private ConnectionFactory factory = null; private Connection conn = null; private Destination destination = null; Destination replyDest = null; private Session session = null; private MessageProducer producer = null; private MessageConsumer consumer = null; @Override public void onMessage(Message message) { System.out.println("on message"); try { // 若有消息傳送到服務時,先創建一個文本消息 TextMessage response = this.session.createTextMessage(); // 若從客戶端傳送到服務端的消息為文本消息 if (message instanceof TextMessage) { // 先將傳送到服務端的消息轉化為文本消息 TextMessage txtMsg = (TextMessage) message; // 取得文本消息的內容 String messageText = txtMsg.getText(); // 將客戶端傳送過來的文本消息進行處理後,設置到回應消息裏面 response.setText("服務器收到消息:" + messageText); System.out.println(response.getText()); } // 設置回應消息的關聯ID,關聯ID來自於客戶端傳送過來的關聯ID response.setJMSCorrelationID(message.getJMSCorrelationID()); System.out.println("replyto===" + message.getJMSReplyTo()); // 生產者發送回應消息,目的由客戶端的JMSReplyTo定義,內容即剛剛定義的回應消息 producer.send(message.getJMSReplyTo(), response); } catch (Exception e) { e.printStackTrace(); } } public void receive(ApplicationContext context) { try { destination = (Destination) context.getBean("destination"); replyDest = (Destination) context.getBean("replyDestination"); factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(replyDest); consumer = session.createConsumer(destination); consumer.setMessageListener(this); } catch (Exception e) { String errorMessage = "JMSException while queueing HTTP JMS Message"; e.printStackTrace(); } } public static void main(String[] args) { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml"); Server s = new Server(); s.receive(context); } }
點對點,應答式消息核心代碼-演示

Tags: 應用程序 master 點對點 可靠性 中間件
文章來源: