1. 程式人生 > >訊息佇列技術-activeMQ

訊息佇列技術-activeMQ

1.訊息

訊息佇列技術中(後文也簡稱MQ )有一個基本概念需要在開篇前進行討論:訊息和訊息
協議。訊息即是資訊的載體,這個描述相信各位讀者都能夠明白。訊息傳送者需要知道如何構
造訊息:訊息接收者需要知道如何解析訊息。所以為了讓訊息傳送者和訊息接收者都能夠明白
訊息所承載的資訊,訊息本身就需要按照一種統一的格式描述訊息,這種統一的格式稱之為
訊息協議。有效的訊息一定具有某一種格式,而沒有格式的訊息是沒有意義的。


訊息從傳送者到接收者的方式也有兩種。一種我們可以稱為直接訊息通訊,也就是說訊息
從一端發出後(訊息傳送者)直接傳送訊息給最終接收者,並得到處理後的回執一一無論是阻
塞方式還是非阻塞方式。這種通訊方式的一種具體實現就是我們已經介紹過的RPC ; 另一種
方式可以稱為中轉訊息通訊,即訊息從某一端發出後,首先會進入一個容器進行處理和臨時
儲存,當訊息接收者準備好或者達到某種傳輸條件後,再由這個容器傳送給另一端,這種容
器的一種具體實現就是訊息佇列。

2.訊息協議

瞭解下就是了

JMS 不是訊息佇列,更不是某種訊息佇列協議。而1S 是Java 訊息服務介面,是一套規範
的Java API 介面

3.ActiveMQ 基本概念和使用

windows 的安裝

https://blog.csdn.net/clj198606061111/article/details/38145597

4.demo

package test108MQ;

import java.net.Socket;

import org.apache.activemq.transport.stomp.StompConnection;

/**使用API 向ActiveMQ 傳送Stomp 協議訊息(生產者端)
 * @Description
 * @Author zengzhiqiang
 * @Date 2018年10月8日
 */

public class TestProducer {

    public static void main(String[] args) {
        try {
            //建立Stomp 協議的連線(不同的協議預設埠是不同的,stomp61613)
            StompConnection con = new StompConnection();
            Socket so = new Socket("10.10.1.227",61613);
            con.open(so);
            con.setVersion("1.2");
            con.connect("admin", "admin");
            //以下發送一條資訊
            con.send("/test","now send a message "+Math.random());
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
}

 

package test108MQ;

import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Map;

import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

/**
 * @Description
 * @Author zengzhiqiang
 * @Date 2018年10月8日
 */

public class TestConsumer {

    public static void main(String[] args) throws Exception {
        //建立連線
        StompConnection  con = new StompConnection();
        Socket so = new Socket("10.10.1.227",61613);
        con.open(so);
        con.setVersion("1.2");
        con.connect("admin", "admin");
        String ack = "client";
        con.subscribe("/test","client");
        //接收訊息
        for(;;){
            StompFrame frame = null;
            try {
                ///注意,如果沒有接收到訊息,那麼這個消費者執行緒會停在這裡,直到本次等待超時
                frame = con.receive();
            } catch (Exception e) {
                // TODO: handle exception
                continue;
            }
            ///列印本次接收到的訊息
            System.out.println("frame.getAction()= "+frame.getAction());
            Map<String, String> headers = frame.getHeaders();
            String message_id =  headers.get("message-id");
            System.out.println("frame.getBody()= "+frame.getBody());
            System.out.println("frame.getCommandId()= "+ frame.getCommandId());
            //在ack 是client 標記的情況下, 確認訊息
            if("client".equals(ack)){
                con.ack(message_id);
            }
        }
             
    }
}

 

 結果:

 

 5.ActiveMQ 中的Queue 和Topics

JMS-Topic 佇列基於“訂閱一發布”模式,當操作者釋出一條訊息後,所
有對這條訊息感興趣的訂閱者都可以收到它一一也就是說這條訊息會被複製成多份然後進行分
發。只有當前“活動的”訂閱者能夠收到訊息,換句話說如果當前JMS-To pic 佇列中沒有訂閱
者,這條訊息將被丟棄

 

JMS-Queue 是一種“負載均衡”模式的實現。一個訊息能且只能被一個消
費者接收。如果當前幾1S-Queue 中沒有任何消費者,那麼這條訊息將會被Queue 儲存起來
(實際應用中可以儲存在磁碟上,也可以儲存在資料庫中,完全是看ActiveMQ 如何配置〉,
直到有一個消費者連線上。另外,當消費者在接收到訊息後,如果在它斷開與JMS-Queue 連
接之前沒有傳送ACK 資訊(可以是客戶端手動傳送,也可以是自動傳送),那麼這條訊息將
被髮送給其他消費者

 

 由於Queue 模式的
佇列是要進行訊息狀態儲存的,所以無論你是先執行“消費者”端,還是先執行“生產者”端,
最後“消費者”都會收到一條訊息。

以上是我摘取的比較重要的筆記。

                                                                                                                                                    --------------------------yhy record。