java訊息佇列ActiveMQ的簡單使用
activeMQ
是學習java訊息佇列的實現專案,使用jfinal +jfinal-ext + activeMQ + quartz快速構建。
1.訊息佇列
訊息佇列,其實是一種基於資料結構實現的服務。而java語言中的實現,有apache的activeMQ,比較主流。
2.環境搭建
首先去apache的官網下載apache-activeMQ-...-.zip的包,解壓後,執行bin中的activeMQ服務。在瀏覽器中輸入http://localhost:8186/admin,出現登陸介面輸入admin/admin登陸即可。

然後建立一個FirstQueue佇列(給後面的例項提供服務)。
3.activeMQ原始操作
記住activeMQ服務一定要一直開啟,傳送者和接收者都會通過tcp協議去連結伺服器,以取得訊息佇列中的訊息體。如下圖是我的伺服器cmd截圖:

3.1.首先建立傳送者Sender.java
packagecom.mg.demo;importjavax.jms.Connection;importjavax.jms.ConnectionFactory;importjavax.jms.DeliveryMode;importjavax.jms.Destination;importjavax.jms.MessageProducer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnection;importorg.apache.activemq.ActiveMQConnectionFactory;publicclassSender {privatestaticfinalintSEND_NUMBER =5;publicstaticvoidmain(String[] args) {// ConnectionFactory :連線工廠,JMS 用它建立連線ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS// Provider 的連線Connection connection =null;// Session: 一個傳送或接收訊息的執行緒Session session;// Destination :訊息的目的地;訊息傳送給誰.Destination destination;// MessageProducer:訊息傳送者MessageProducer producer;// TextMessage message;// 構造ConnectionFactory例項物件,此處採用ActiveMq的實現jarconnectionFactory =newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");try{// 構造從工廠得到連線物件connection = connectionFactory.createConnection();// 啟動connection.start();// 獲取操作連線session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置destination = session.createQueue("FirstQueue");// 得到訊息生成者【傳送者】producer = session.createProducer(destination);// 設定不持久化,此處學習,實際根據專案決定producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 構造訊息,此處寫死,專案就是引數,或者方法獲取sendMessage(session, producer); session.commit(); }catch(Exception e) { e.printStackTrace(); }finally{try{if(null!= connection) connection.close(); }catch(Throwable ignore) { } } }publicstaticvoidsendMessage(Session session, MessageProducer producer)throwsException {for(inti =1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMq 傳送的訊息"+ i);// 傳送訊息到目的地方System.out.println("傳送訊息:"+"ActiveMq 傳送的訊息"+ i); producer.send(message); } }
3.2.再建立接收者Receiver.java
packagecom.mg.demo;importjavax.jms.Connection;importjavax.jms.ConnectionFactory;importjavax.jms.Destination;importjavax.jms.MessageConsumer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnection;importorg.apache.activemq.ActiveMQConnectionFactory;publicclassReceiver{publicstaticvoidmain(String[] args) {// ConnectionFactory :連線工廠,JMS 用它建立連線ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS Provider 的連線Connection connection =null;// Session: 一個傳送或接收訊息的執行緒Session session;// Destination :訊息的目的地;訊息傳送給誰.Destination destination;// 消費者,訊息接收者MessageConsumer consumer; connectionFactory =newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");try{// 構造從工廠得到連線物件connection = connectionFactory.createConnection();// 啟動connection.start();// 獲取操作連線session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination);while(true) {// 設定接收者接收訊息的時間,為了便於測試,這裡誰定為100sTextMessage message = (TextMessage) consumer.receive(100000);if(null!= message) { System.out.println("收到訊息"+ message.getText()); }else{break; } } }catch(Exception e) { e.printStackTrace(); }finally{try{if(null!= connection) connection.close(); }catch(Throwable ignore) { } } }}
3.3.測試結果
先執行接收者Receiver.java,在執行Sender.java。得到結果如下圖:(2個控制檯都會輸出如下圖資料)

4.使用jfinal-ext中的jms外掛操作activeMQ
整合quartz任務排程框架,實現每10秒傳送一次訊息到佇列。
4.1.核心程式碼
public static void main(String[] args) throwsInstantiationException, IllegalAccessException, ClassNotFoundException {JmsPlugin jp = newJmsPlugin("jms.properties");jp.start();PropertyConfig pc = PropertyConfig.me();pc.loadPropertyFile("job.properties");QuartzPlugin qp = new QuartzPlugin();if (pc.getPropertyToBoolean("a.enable")) { qp.add(pc.getProperty("a.cron"), (Job) Class.forName(pc.getProperty("a.job")).newInstance());} qp.start();}
4.2.配置檔案jms.properties
################################# server info ################################## jms伺服器地址serverUrl=tcp://localhost:61616username=adminpassword=admin################################# queue info ################################## 傳送的佇列名字,用“,”號分隔sendQueues=firstMQ# 接受的佇列的名字,用“,”號分隔receiveQueues=firstMQ# 佇列firstMQ上訊息名字為a的訊息號queue.firstMQ.a=10000#接受到佇列q1上訊息名字為a的訊息的時候呼叫的處理器queue.firstMQ.a.resolver=com.mg.jfinal.ext.demo.resolver.MGResolver
4.3.配置檔案job.properties
#JobAa.job=com.mg.jfinal.task.JobAa.cron=*/10 * * * * ?a.enable=true
4.4.執行結果
如圖:

Java高架構師、分散式架構、高可擴充套件、高效能、高併發、效能優化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分散式專案實戰學習架構師視訊免費學習加群:835638062 點選連結加入群聊【Java高階架構】:https://jq.qq.com/?_wv=1027&k=5S3kL3v