1. 程式人生 > >activemq使用系列: linux環境下ActiveMQ 的安裝與使用(單節點)

activemq使用系列: linux環境下ActiveMQ 的安裝與使用(單節點)

1、 安裝 JDK 並配置環境變數(略)

2   下載 Linux 版的 ActiveMQ

<span style="white-space:pre">	</span>wget http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz

3、 解壓

4、 防火牆中開啟activemq的使用埠

一個是連線broker的埠

我們可以在amq的安裝目錄包的conf目錄下的activemq.xml中看到

61616就是我們的broker連線埠

一個是amq控制檯的埠

我們可以在conf/jetty.xml中看到

開啟linux防火牆檔案,增加如下配置,允許這兩個埠被訪問

<span style="white-space:pre">		</span>vi /etc/sysconfig/iptables

重新啟動防火牆服務

<span style="white-space:pre">		</span>service iptables restart

進入amq的安裝目錄下/bin/ 啟動amq執行指令碼

<span style="white-space:pre">		</span>./activemq start

開啟瀏覽器輸入地址

<span style="white-space:pre">		</span>http://192.168.1.10:8161/

這裡的IP地址可以通過linux命令 ipconfig檢視,我這裡是192.168.1.10

接下們寫個小程式使用一下

建立一個maven工程用

建立一個訊息傳送類

/**
 * 


 */
package com.pcx.amqproducer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * 
 * @author scarletbullet
 * @version $Id: MyProducer.java
 */
public class MyProducer {
        public static void main(String[] args) {
            //連線broker,就是我們剛剛防火牆裡面開啟的那個埠
          ConnectionFactory cf=  new ActiveMQConnectionFactory("tcp://192.168.1.10:61616");
            //建立了一個佇列名稱為 "user.queue"
          Destination  destination=new ActiveMQQueue("user.queuer");
          Connection conn=null;
         
          try {
              //從連線工程裡面獲取一個新連線
            conn=cf.createConnection();
            //自動確認訊息的傳送
            Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一個訊息生產者,並且配置傳送訊息的地址
            MessageProducer producer=session.createProducer(destination);
            //建立一個map型別的訊息
            MapMessage message=session.createMapMessage();
            message.setString("userId", "123456");
            message.setString("userName", "李四");
            message.setInt("age", 18);
            //傳送
            producer.send(message);
            //關閉
            session.close();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }finally{
            if(conn!=null){
                try {
                    conn.close();
                } catch (Exception e2) {
                    // TODO: handle exception
                }
            }
        }
        
        }
}
執行一下,我們重新整理一下amq的控制檯可以看見

我們可以看到訊息傳送成功

接下來寫一下訊息消費者

一樣,我們建立一個maven工程

建立一個訊息消費者的類

package com.pcx.amqconsumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * 
 * @author scarletbullet
 * @version $Id: MyConsumer.java, 
 */
public class MyConsumer {

    public static void main(String[] args) {
        //連線broker,就是我們剛剛防火牆裡面開啟的那個埠
        ConnectionFactory cf=  new ActiveMQConnectionFactory("tcp://192.168.1.10:61616");
        //建立了一個佇列名稱為 "user.queue"
        Destination  destination=new ActiveMQQueue("user.queuer");
        Connection conn=null;
       
       
        try {
            //從連線工程裡面獲取一個新連線
          conn=cf.createConnection();
        //自動確認訊息的接受
          Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          //建立一個消費者
          MessageConsumer consumer=session.createConsumer(destination);
          conn.start();
          //同步阻塞等待接受訊息
          MapMessage message=(MapMessage)consumer.receive();
         
        
          System.out.println(message.getInt("age") +" "+ message.getString("userId")+" "+message.getString("userName"));
          session.close();
      } catch (Exception e) {
          throw new RuntimeException(e);
      }finally{
          if(conn!=null){
              try {
                  conn.close();
              } catch (Exception e2) {
                  // TODO: handle exception
              }
          }
      }
      
      }

}
run一下這個java類

我們可以在控制檯看到