1. 程式人生 > >架構設計:系統間通訊(21)——ActiveMQ的安裝與使用

架構設計:系統間通訊(21)——ActiveMQ的安裝與使用

1、前言

之前我們通過兩篇文章(架構設計:系統間通訊(19)——MQ:訊息協議(上)架構設計:系統間通訊(20)——MQ:訊息協議(下))從理論層面上為大家介紹了訊息協議的基本定義,並花了較大篇幅向讀者介紹了三種典型的訊息協議:XMPP協議、Stomp協議和AMQP協議。本小節開始,我們基於之前的知識點講解這些協議在具體的“訊息佇列中介軟體”中是如何被我們操作的。由於本人在實際工作中經常使用ActiveMQ和RabbitMQ,所以就選取這兩個“訊息佇列中介軟體”進行講解。如果讀者可以補充其他“訊息佇列中介軟體”的使用,那當然是再好不過了。

2、ActiveMQ的安裝和使用

ActiveMQ是Apache軟體基金會的開源產品,支援AMQP協議、MQTT協議(和XMPP協議作用類似)、Openwire協議和Stomp協議等多種訊息協議。並且ActiveMQ完整支援JMS API介面規範(當然Apache也提供多種其他語言的客戶端,例如:C、C++、C#、Ruby、Perl)。

2-1、ActiveMQ的安裝

在本文釋出之時,ActiveMQ最新的版本號是5.13.2(版本號升級很快,不過並不推薦使用最新的版本)。由ActiveMQ的安裝是很簡單,所以這個過程並不值得我們花很大篇幅進行討論。具體的過程就是:下載->解壓->配置環境變數->執行:

  • 下載軟體
  • 解壓安裝

將下載的安裝包放置在root使用者的home目錄內,解壓即可(當然您可以根據自己的需要加壓到不同的檔案路徑下)。如下所示:

[root@localhost ~]# tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz

以上解壓使用的是root使用者,這是為了演示方便。正式環境中還是建議禁用root使用者,為activeMQ的執行專門建立一個使用者和使用者組。

  • 配置環境變數(不是必須的)

如果您只是在測試環境使用Apache ActiveMQ,以便熟悉訊息中介軟體本身的特性和使用方式。那麼您無需對解壓後的軟體進行任何配置,所有可執行的命令都在軟體安裝目錄的./bin目錄下。為了使用方便,最好配置一下環境變數,如下所示(注意,根據您自己的軟體安裝位置,環境變數的設定是不一樣的,請不要盲目貼上複製):

設定該次會話的環境變數:
[root@localhost ~]# export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;

永久設定環境變數:
[root@localhost ~]# echo "export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;" >> /etc/profile

在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 針對作業系統進行了更深入的優化,所以您可以看到./bin目錄下,有一個針對32位Linux執行命令的./linux-x86-32目錄,和針對64位Linux執行命令的./linux-x86-64目錄。請按照您自己的情況進行環境變數設定和命令執行。

  • 執行程式

現在您可以在任何目錄,執行activemq命令了。注意activemq命令一共有6個引數(console | start | stop | restart | status | dump),啟動Apache ActiveMQ使用的命令是activemq start:

[root@localhost ~]# activemq start

如果啟動成功,就可以在瀏覽器上訪問服務節點在8161埠的管理頁面了(例如http://localhost:8161):

這裡寫圖片描述

點選‘manage ActiveMQ broker’連線,可以進入管理主介面(預設的使用者和密碼都是admin)。以上就是Apache ActiveMQ訊息中介軟體最簡的安裝和執行方式。在後續的文章中,我們會陸續討論ActiveMQ的叢集和高效能優化,那時會介紹對應的ActiveMQ的配置問題。

2-2、ActiveMQ的其他命令引數

如同上文講到的,activemq命令除了start引數用於啟動activemq程式以外,還有另外5個引數可以使用:console | stop | restart | status | dump。他們代表的使用意義是:

  • stop:停止當前ActiveMQ節點的執行。

  • restart:重新啟動當前ActiveMQ節點。

  • status:檢視當前ActiveMQ節點的執行狀態。如果當前ActiveMQ節點沒有執行,那麼將返回“ActiveMQ Broker is not running”的提示資訊。注意,status命令只能告訴開發人員當前節點時停止的還是執行的,除此之外不能從status命令獲取更多的資訊。例如,ActiveMQ為什麼建立Queue失敗?當前ActiveMQ使用了多少記憶體?而要獲取這些資訊,需要使用以下引數啟動ActiveMQ節點。

  • console:使用控制檯模式啟動ActiveMQ節點;在這種模式下,開發人員可以除錯、監控當前ActivieMQ節點的實時情況,並獲取實時狀態。

  • dump:如果您採用console模式執行ActiveMQ,那麼就可以使用dump引數,在console控制檯上獲取當前ActiveMQ節點的執行緒狀態快照。

2-3、在ActiveMQ中傳遞Stomp訊息

好吧,既然我們已經討論過如何安裝和執行ActiveMQ,也討論了Stomp協議的組織結構,為什麼我們不立即動手試一試操作ActiveMQ承載Stomp協議的訊息呢?

下面我們使用ActiveMQ提供的JAVA 客戶端(實際上就是ActiveMQ對JMS規範的實現),向ActiveMQ中的Queue(示例程式碼中將這個Queue命名為’test’)傳送一條Stomp協議訊息,然後再使用JAVA語言的客戶端,從ActiveMQ上接受這條訊息:

  • 使用ActiveMQ的API傳送Stomp協議訊息:
package mq.test.stomp;

import java.net.Socket;
import java.util.Date;

import org.apache.activemq.transport.stomp.StompConnection;

// 訊息生產者
public class TestProducer {
    public static void main(String[] args) {
        try {
            // 建立Stomp協議的連線
            StompConnection con = new StompConnection();
            Socket so = new Socket("192.168.61.138", 61613);
            con.open(so);
            // 注意,協議版本可以是1.2,也可以是1.1
            con.setVersion("1.2");
            // 使用者名稱和密碼,這個不必多說了
            con.connect("admin", "admin");

            // 以下發送一條資訊(您也可以使用“事務”方式)
            con.send("/test", "234543" + new Date().getTime());
        } catch(Exception e) {
            e.printStackTrace(System.out);
        }
    }
}
  • 使用ActiveMQ的API接收Stomp協議訊息:
package mq.test.stomp;

import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

public class TestConsumer {
    public static void main(String[] args) throws Exception {
        // 建立連線
        StompConnection con = new StompConnection();
        Socket so = new Socket("192.168.61.138", 61613);
        con.open(so);
        con.setVersion("1.2");
        con.connect("admin", "admin");

        String ack = "client";
        con.subscribe("/test", "client");
        // 接受訊息(使用迴圈進行)
        for(;;) {
            StompFrame frame = null;
            try {
                // 注意,如果沒有接收到訊息,
                // 這個消費者執行緒會停在這裡,直到本次等待超時
                frame = con.receive();
            } catch(SocketTimeoutException e) {
                continue;
            }

            // 列印本次接收到的訊息
            System.out.println("frame.getAction() = " + frame.getAction());
            Map<String, String> headers = frame.getHeaders();
            String meesage_id = headers.get("message-id");
            System.out.println("frame.getBody() = " + frame.getBody());
            System.out.println("frame.getCommandId() = " + frame.getCommandId());

            // 在ack是client標記的情況下,確認訊息
            if("client".equals(ack)) {
                con.ack(meesage_id);
            }
        }
    }
}

以上分別是使用Activie提供的Stomp協議的訊息生產端和Stomp協議的訊息消費端的程式碼(如果您不清楚Stomp協議的細節,可以參考我另一篇文章:《架構設計:系統間通訊(19)——MQ:訊息協議(上)》)。請注意在程式碼片段中,並沒有出現任何一個帶有jms名稱的包或者類——這是因為ActiveMQ為Stomp協議提供的JAVA API在內部進行了JMS規範的封裝。

您可以檢視activemq-stomp中關於協議轉換部分的原始碼:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父級介面:org.apache.activemq.transport.stomp.FrameTranslator來驗證這件事情(關於ActiveMQ對JMS規範的實現設計,如果後續有時間再回頭進行講解)。

以下是Stomp協議的消費者端的執行效果(在生產者端已經向ActiveMQ插入了一條訊息之後):

frame.getAction() = MESSAGE
frame.getBody() = 2345431458460073204
frame.getCommandId() = 0

注意,由於訊息體中插入了一個時間戳,所以您複製貼上程式碼後執行效果並不會和我的演示程式完全一致。

2-4、ActiveMQ中的Queue和Topics

如果您細心的話,在ActiveMQ提供的管理頁面上已經看到有兩個功能頁面:Queue和Topic。Queue和Topic是JMS為開發人員提供的兩種不同工作機制的訊息佇列。 在ActiveMQ官方的解釋是:

  • Topics

In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.

中文的可以譯做:JMS-Topic 佇列基於“訂閱-釋出”模式,當操作者釋出一條訊息後,所有對這條訊息感興趣的訂閱者都可以收到它——也就是說這條訊息會被拷貝成多份,進行分發。只有當前“活動的”訂閱者能夠收到訊息(換句話說,如果當前JMS-Topic佇列中沒有訂閱者,這條訊息將被丟棄)。

  • Queue

A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.

So Queues implement a reliable load balancer in JMS.

中文的可以譯做:JMS-Queue是一種“負載均衡模式”的實現。一個訊息能且只能被一個消費者接受。如果當前JMS-Queue中沒有任何的消費者,那麼這條訊息將會被Queue儲存起來(實際應用中可以儲存在磁碟上,也可以儲存在資料庫中,看軟體的配置),直到有一個消費者連線上。另外,如果消費者在接受到訊息後,在他斷開與JMS-Queue連線之前,沒有傳送ack資訊(可以是客戶端手動傳送,也可以是自動傳送),那麼這條訊息將被髮送給其他消費者。

以下表格摘自網際網路上的資料,基本上把Queue和Topic這兩種佇列的不同特性說清楚了:

比較專案 Topic 模式佇列 Queue 模式佇列
工作模式 “訂閱-釋出”模式,如果當前沒有訂閱者,訊息將會被丟棄。如果有多個訂閱者,那麼這些訂閱者都會收到訊息 “負載均衡”模式,如果當前沒有消費者,訊息也不會丟棄;如果有多個消費者,那麼一條訊息也只會傳送給其中一個消費者,並且要求消費者ack資訊。
有無狀態 無狀態 Queue資料預設會在mq伺服器上以檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB儲存。
傳遞完整性 如果沒有訂閱者,訊息會被丟棄 訊息不會丟棄
處理效率 由於訊息要按照訂閱者的數量進行復制,所以處理效能會隨著訂閱者的增加而明顯降低,並且還要結合不同訊息協議自身的效能差異 由於一條訊息只發送給一個消費者,所以就算消費者再多,效能也不會有明顯降低。當然不同訊息協議的具體效能也是有差異的

2-5、JMS和協議間轉換

上文已經說到,JMS這套面向訊息通訊的 JAVA API 是一個和廠商無關的規範。通過JMS,我們能實現不同訊息中介軟體廠商、不同協議間的轉換和互動。這一小節我們就來討論一下這個問題。如果用一張圖來表示JMS在訊息中介軟體中的作用話,那麼就可以這麼來畫:

這裡寫圖片描述

首先您使用的MQ訊息中介軟體需要實現了JMS規範;那麼通過JMS規範,開發人員可以忽略各種訊息協議的細節,只要訊息在同一佇列中,就能夠保證各種訊息協議間實現互相轉換。下面我們首先來看一個使用JMS API在ActiveMQ中操作openwire協議訊息的簡單示例,然後再給出一個通過JMS,實現Stomp訊息協議和Openwire訊息協議間的互轉示例。

2-5-1、JMS操作

  • 以下程式碼使用向某個Queue(命名為test)中傳送一條訊息:
package jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 測試使用JMS API連線ActiveMQ
 * @author yinwenjie
 */
public class JMSProducer {
    /**
     * 由於是測試程式碼,這裡忽略了異常處理。
     * 正是程式碼可不能這樣做
     * @param args
     * @throws RuntimeException
     */
    public static void main (String[] args) throws Exception {
        // 定義JMS-ActiveMQ連線資訊(預設為Openwire協議)
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
        Session session = null;
        Destination sendQueue;
        Connection connection = null;

        //進行連線
        connection = connectionFactory.createQueueConnection();
        connection.start();

        //建立會話(設定一個帶有事務特性的會話)
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        //建立queue(當然如果有了就不會重複建立)
        sendQueue = session.createQueue("/test");
        //建立訊息傳送者物件
        MessageProducer sender = session.createProducer(sendQueue);
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("這是傳送的訊息內容");

        //傳送(JMS是支援事務的)
        sender.send(outMessage);
        session.commit();

        //關閉
        sender.close();
        connection.close();
    }
}

當以上程式碼執行到“start”的位置時,我們可以通過觀察ActiveMQ管理介面中connection列表中的連線資訊,發現訊息生產者已經建立了一個Openwire協議的連線:

這裡寫圖片描述

從而確定我們通過JMS API建立了一個openwire協議的通訊連線。接著我們使用以下程式碼,建立一個基於openwire協議的“消費者”。注意:訊息生產者和訊息消費者,對映的佇列必須一致。(在示例程式碼中,它們都對映名稱為test的JMS-Queue)

  • 以下程式碼使用JMS從某個Queue中接收訊息:
package jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import javax.jms.Session;


import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 測試使用JMS API連線ActiveMQ
 * @author yinwenjie
 */
public class JMSConsumer {
    /**
     * 由於是測試程式碼,這裡忽略了異常處理。
     * 正是程式碼可不能這樣做
     * @param args
     * @throws RuntimeException
     */
    public static void main (String[] args) throws Exception {
        // 定義JMS-ActiveMQ連線資訊
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
        Session session = null;
        Destination sendQueue;
        Connection connection = null;

        //進行連線
        connection = connectionFactory.createQueueConnection();
        connection.start();

        //建立會話(設定為自動ack)
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立Queue(當然如果有了就不會重複建立)
        sendQueue = session.createQueue("/test");
        //建立訊息傳送者物件
        MessageConsumer consumer = session.createConsumer(sendQueue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message arg0) {
                // 接收到訊息後,不需要再發送ack了。
                System.out.println("Message = " + arg0);
            }
        });

        synchronized (JMSConsumer.class) {
            JMSConsumer.class.wait();
        }

        //關閉
        consumer.close();
        connection.close();
    }
}

當以上“消費者”程式碼執行到start的位置時,我們通過ActiveMQ提供的管理介面可以看到,基於Openwire協議的連線增加到了兩條:

這裡寫圖片描述

注意,您在執行以上測試程式碼時,不用和我的執行順序一致。由於Queue模式的佇列是要進行訊息狀態儲存的,所以無論您是先執行“消費者”端,還是先執行“生產者”端,最後“消費者”都會收到一條訊息。類似如下的效果:

Message = ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1, destination = queue:///test, transactionId = TX:ID:yinwenjie-240-60482-1458616972423-1:1:1, expiration = 0, timestamp = 1458617840154, arrival = 0, brokerInTime = 1458617840166, brokerOutTime = 1458617840187, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@66968df8, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 這是傳送的訊息內容}

2-5-2、協議間轉換

下面我們將Openwire協議的訊息通過JMS送入Queue佇列,並且讓基於Stomp協議的消費者接收到這條訊息。為了節約篇幅,基於Openwire協議的生產者的程式碼請參考上一小節2-5-1中“生產者”的程式碼片段。這裡只列出Stomp訊息的接受者程式碼(實際上這段程式碼在上文中也可以找到):

  • Stomp協議的訊息消費者(訊息接收者):
package mq.test.stomp;

import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

public class TestConsumer {
    public static void main(String[] args) throws Exception {
        // 建立連線(注意,Stomp協議的連線埠是61613)
        StompConnection con = new StompConnection();
        Socket so = new Socket("192.168.61.138", 61613);
        con.open(so);
        con.setVersion("1.2");
        con.connect("admin", "admin");

        String ack = "client";
        con.subscribe("/test", "client");
        // 接受訊息(使用迴圈進行)
        for(;;) {
            StompFrame frame = null;
            try {
                // 注意,如果沒有接收到訊息,
                // 這個消費者執行緒會停在這裡,直到本次等待超時
                frame = con.receive();
            } catch(SocketTimeoutException e) {
                continue;
            }

            // 列印本次接收到的訊息
            System.out.println("frame.getAction() = " + frame.getAction());
            Map<String, String> headers = frame.getHeaders();
            String meesage_id = headers.get("message-id");
            System.out.println("frame.getBody() = " + frame.getBody());
            System.out.println("frame.getCommandId() = " + frame.getCommandId());

            // 在ack是client模式的情況下,確認訊息
            if("client".equals(ack)) {
                con.ack(meesage_id);
            }
        }
    }
}

當您同時執行Openwire訊息傳送者和Stomp訊息接收者時,您可以在ActiveMQ的管理介面看到這兩種協議的連線資訊:

這裡寫圖片描述

以下是Stomp協議消費者接收到的訊息內容(經過轉換的openwire協議訊息):

frame.getAction() = MESSAGE
frame.getBody() = 這是傳送的訊息內容
frame.getCommandId() = 0

接下文