activemq使用系列: linux環境下ActiveMQ 的安裝與使用(單節點)
阿新 • • 發佈:2019-01-25
1、 安裝 JDK 並配置環境變數(略)
2 下載 Linux 版的 ActiveMQ
<span style="white-space:pre"> </span>wget http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz
3、 解壓
4、 防火牆中開啟activemq的使用埠
一個是連線broker的埠
我們可以在amq的安裝目錄包的conf目錄下的activemq.xml中看到
61616就是我們的broker連線埠
一個是amq控制檯的埠
我們可以在conf/jetty.xml中看到
開啟linux防火牆檔案,增加如下配置,允許這兩個埠被訪問
<span style="white-space:pre"> </span>vi /etc/sysconfig/iptables
重新啟動防火牆服務
<span style="white-space:pre"> </span>service iptables restart
進入amq的安裝目錄下/bin/ 啟動amq執行指令碼
<span style="white-space:pre"> </span>./activemq start
開啟瀏覽器輸入地址
<span style="white-space:pre"> </span>http://192.168.1.10:8161/
這裡的IP地址可以通過linux命令 ipconfig檢視,我這裡是192.168.1.10
接下我們寫個小程式使用一下
建立一個maven工程用
建立一個訊息傳送類
執行一下,我們重新整理一下amq的控制檯可以看見/** * */ package com.pcx.amqproducer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; /** * * @author scarletbullet * @version $Id: MyProducer.java */ public class MyProducer { public static void main(String[] args) { //連線broker,就是我們剛剛防火牆裡面開啟的那個埠 ConnectionFactory cf= new ActiveMQConnectionFactory("tcp://192.168.1.10:61616"); //建立了一個佇列名稱為 "user.queue" Destination destination=new ActiveMQQueue("user.queuer"); Connection conn=null; try { //從連線工程裡面獲取一個新連線 conn=cf.createConnection(); //自動確認訊息的傳送 Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個訊息生產者,並且配置傳送訊息的地址 MessageProducer producer=session.createProducer(destination); //建立一個map型別的訊息 MapMessage message=session.createMapMessage(); message.setString("userId", "123456"); message.setString("userName", "李四"); message.setInt("age", 18); //傳送 producer.send(message); //關閉 session.close(); } catch (Exception e) { throw new RuntimeException(e); }finally{ if(conn!=null){ try { conn.close(); } catch (Exception e2) { // TODO: handle exception } } } } }
我們可以看到訊息傳送成功
接下來寫一下訊息消費者
一樣,我們建立一個maven工程
建立一個訊息消費者的類
package com.pcx.amqconsumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
/**
*
* @author scarletbullet
* @version $Id: MyConsumer.java,
*/
public class MyConsumer {
public static void main(String[] args) {
//連線broker,就是我們剛剛防火牆裡面開啟的那個埠
ConnectionFactory cf= new ActiveMQConnectionFactory("tcp://192.168.1.10:61616");
//建立了一個佇列名稱為 "user.queue"
Destination destination=new ActiveMQQueue("user.queuer");
Connection conn=null;
try {
//從連線工程裡面獲取一個新連線
conn=cf.createConnection();
//自動確認訊息的接受
Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個消費者
MessageConsumer consumer=session.createConsumer(destination);
conn.start();
//同步阻塞等待接受訊息
MapMessage message=(MapMessage)consumer.receive();
System.out.println(message.getInt("age") +" "+ message.getString("userId")+" "+message.getString("userName"));
session.close();
} catch (Exception e) {
throw new RuntimeException(e);
}finally{
if(conn!=null){
try {
conn.close();
} catch (Exception e2) {
// TODO: handle exception
}
}
}
}
}
run一下這個java類
我們可以在控制檯看到