1. 程式人生 > >訊息佇列入門(四)ActiveMQ的應用例項

訊息佇列入門(四)ActiveMQ的應用例項

>>部署和啟動ActiveMQ

我下載的是apache-activemq-5.12.0-bin.tar.gz,

解壓到本地目錄,進入到bin路徑下,
執行activemq啟動ActiveMQ。

執行方式:
啟動 ./activemq start

ActiveMQ預設使用的TCP連線埠是61616,
5.0以上版本預設啟動時,開啟了內建的Jetty伺服器,可以進入控制檯檢視管理。

預設使用者名稱admin/admin

這裡我在虛擬機器裡啟動,訪問地址:
http://192.168.106.128:8161/admin/

ActiveMQ的控制檯功能十分強大,管理起來也很直觀。

>>使用Java連線

建立POM檔案

在Eclipse中新建Java工程,這裡使用Maven管理依賴,
下面是pom.xml:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 <projectxmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>activemq-sample</groupId> <artifactId>activemq-sample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>activemq-sample</name> <description>an activemq practice</description> <build> <
sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <!-- activemq-core 5.7.0 使用bunble打包,需要新增相關外掛 --> <plugin> <groupId>org.apache.felix</groupId> <artifactId>maven-bundle-plugin</artifactId> <extensions>true</extensions> </plugin> </plugins> </build> <dependencies> <!-- activemq的maven依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> <type>bundle</type> </dependency> </dependencies> </project>

在第一次新增activemq的maven依賴時報錯,後來發現activemq-core 5.7.0採用了bundle的打包方式,

必須在pom中配置maven-bundle-plugin。

建立訊息建立者 MsgProducer:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @Description: Message Producer * @author: Bing Yue */ public class MsgProducer { //如果你在本地啟動,可以直接使用空的ActiveMQConnectionFactory建構函式 privatestatic final String BROKER_URL="failover://tcp://192.168.106.128:61616"; public static void main(String[] args)throws JMSException, InterruptedException{ //建立連線工廠 ConnectionFactory connectionFactory=newActiveMQConnectionFactory(BROKER_URL); //獲得連線 Connection conn = connectionFactory.createConnection(); //start conn.start(); //建立Session,此方法第一個引數表示會話是否在事務中執行,第二個引數設定會話的應答模式 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立佇列 Destination dest = session.createQueue("test-queue"); //建立訊息生產者 MessageProducer producer = session.createProducer(dest); for (int i=0;i<100;i++) { //初始化一個mq訊息 TextMessage message = session.createTextMessage("這是第 "+ i+" 條訊息!"); //傳送訊息 producer.send(message); System.out.println("send message:訊息"+i); //暫停3秒 Thread.sleep(3000); } //關閉mq連線 conn.close(); } }

建立訊息接收者 MsgProducer:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * * @Description: Message Consumer * @author: Bing Yue */ public class MsgConsumer implementsMessageListener { privatestatic final String BROKER_URL="failover://tcp://192.168.106.128:61616"; public static void main(String[] args)throws JMSException{ //建立連線工廠 ConnectionFactory connectionFactory=newActiveMQConnectionFactory(BROKER_URL); //獲得連線 Connection conn = connectionFactory.createConnection(); //start conn.start(); //建立Session,此方法第一個引數表示會話是否在事務中執行,第二個引數設定會話的應答模式 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立佇列 Destination dest = session.createQueue("test-queue"); //建立訊息生產者 MessageConsumer consumer = session.createConsumer(dest); //初始化MessageListener MsgConsumer msgConsumer =new MsgConsumer(); //給消費者設定監聽物件 consumer.setMessageListener(msgConsumer); } /** * 消費者需要實現MessageListener介面 * 介面有一個onMessage(Message message)需要在此方法中做訊息的處理 */ @Override public void onMessage(Message msg) { TextMessage txtMessage = (TextMessage)msg; try { System.out.println("get message:"+ txtMessage.getText()); } catch(JMSException e) { e.printStackTrace(); } } }

執行MsgProducer,

登入後臺檢視test-queue佇列,可以看到發出的訊息正在等待被處理:

 

執行MsgConsumer,接收訊息並在控制檯列印:

通過這個例項可以對ActiveMQ的應用有一個簡單的瞭解。

在實際開發中,通常還需要設定優先順序處理,大部分情況下,訊息的傳送和接收方都會啟用多執行緒,
通過執行緒池來提高處理效率,解耦的同時保持業務處理能力。