1. 程式人生 > >ActiveMQ的使用與遇到的相關坑(點對點,釋出與訂閱,resreq)

ActiveMQ的使用與遇到的相關坑(點對點,釋出與訂閱,resreq)

1、介紹

ActiveMQ是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

MQ可以是不同應用之間進行溝通的橋樑,

支援多種語言和協議編寫客戶端,如: Java,C,C++,C#,Ruby,Perl,Python,PHP等;

應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

簡單的說,如果一個專案裡分多個應用,每個應用分別處理著各自的事,MQ就是他們互相聯絡的工具,一個應用處理好某件事,向MQ傳送一個訊息,MQ通過點對點、釋出訂閱或request-response的方式來處理訊息,將訊息轉發出去,另一個應用接收到訊息,並作出相應的處理。

2、下載與安裝

a、在ActiveMQ的官網下載最新版本,如圖

b、解壓檔案,執行\apache-activemq-5.5.1\bin\activemq.bat,正常的話就會有這樣的提示“Started [email protected]:8161”,出錯了也不用擔心,報的錯通過google可以很輕易的解決。

c、開啟網址http://localhost:8161/admin/,進入MQ的管理介面如下圖

到此,MQ的下載與安裝就完成了。

3、MQ的通訊方式

MQ中需要用的一些類

  • ConnectionFactory :連線工廠,JMS 用它建立連線
  • Connection :JMS 客戶端到JMS Provider 的連線
  • Session: 一個傳送或接收訊息的執行緒
  • Destination :訊息的目的地;訊息傳送給誰.
  • MessageConsumer / MessageProducer: 訊息接收者,消費者

a、釋出-訂閱

釋出訂閱模式有點類似於我們日常生活中訂閱報紙。每年到年尾的時候,郵局就會發一本報紙集合讓我們來選擇訂閱哪一個。在這個表裡頭列了所有出版發行的報紙,那麼對於我們每一個訂閱者來說,我們可以選擇一份或者多份報紙。比如北京日報、瀟湘晨報等。那麼這些個我們訂閱的報紙,就相當於釋出訂閱模式裡的topic。有很多個人訂閱報紙,也有人可能和我訂閱了相同的報紙。那麼,在這裡,相當於我們在同一個topic裡註冊了。對於一份報紙發行方來說,它和所有的訂閱者就構成了一個1對多的關係。

突破目的佇列地理指向的限制,使訊息按照特定的主題甚至內容進行分發,使用者或應用程式可以根據主題或內容接收到所需要的訊息。釋出/訂閱功能使得傳送者和接收者之間的耦合關係變得更為鬆散,傳送者不必關心接收者的目的地址,而接收者也不必關心訊息的傳送地址,而只是根據訊息的主題進行訊息的收發。

這種關係如下圖所示:

java程式碼實現如下圖,程式碼最後有分享

這裡注意

i、session.createTopic(Stirng),引數要保持一致,原因你訂閱了某個報紙,如果寫錯,當然郵局就不會認為你訂閱了這個報紙;

ii、先執行receved,在執行sender,你必須先訂閱報紙,郵局才會發給你,什麼時候訂閱,就從那個時間以後發報紙,也就是說,你在這個點啟動receved,接收到的訊息就是這個點以後的,之前的是收不到(這是與點對點的區別,很重要),你可以測試兩者啟動的順序;

b、p2p(點對點)

p2p的過程則理解起來更加簡單。它好比是兩個人打電話,這兩個人是獨享這一條通訊鏈路的。一方傳送訊息,另外一方接收,就這麼簡單。在實際應用中因為有多個使用者對使用p2p的鏈路;

點對點方式是最為傳統和常見的通訊方式,它支援一對一、一對多、多對多、多對一等多種配置方式,支援樹狀、網狀等多種拓撲結構。

它的通訊場景如下圖所示:

java程式碼如下:

注意:

i、session.createQueue(String),和訂閱釋出不同,其他的都相同,也不要忘了引數要一直,相當於一個佇列

ii、無論Receiver什麼時候啟動,只要佇列裡有訊息,全部都可以接收到,可以更換兩者的啟動方式來測試

c、request-response

和前面兩種方式比較起來,request-response的通訊方式很常見,但是不是預設提供的一種模式。在前面的兩種模式中都是一方負責傳送訊息而另外一方負責處理。而我們實際中的很多應用相當於一種一應一答的過程,需要雙方都能給對方傳送訊息。於是請求-應答的這種通訊方式也很重要。它也應用的很普遍。      請求-應答方式並不是JMS規範系統預設提供的一種通訊方式,而是通過在現有通訊方式的基礎上稍微運用一點技巧實現的。

個人覺得這樣比較合理,就像tcp/ip協議一樣,有請求就要有響應,我傳送給你訊息,你至少給我個反饋吧,收到還是沒收到,出現了什麼問題,都說一聲,我這邊可以繼續下一個訊息的轉發

相關程式碼:

Client:

<span style="font-family:Microsoft YaHei;font-size:14px;">package com.mq.reqres;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Random;
 
public class Client implements MessageListener {
    private static int ackMode;
    private static String clientQueueName;
 
    private boolean transacted = false;
    private MessageProducer producer;
 
    static {
        clientQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }
 
    public Client() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(transacted, ackMode);
            Destination adminQueue = session.createQueue(clientQueueName);
 
            //Setup a message producer to send message to the queue the server is consuming from
            this.producer = session.createProducer(adminQueue);
            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
            //Create a temporary queue that this client will listen for responses on then create a consumer
            //that consumes message from this temporary queue...for a real application a client should reuse
            //the same temp queue for each message to the server...one temp queue per client
            Destination tempDest = session.createTemporaryQueue();
            MessageConsumer responseConsumer = session.createConsumer(tempDest);
 
            //This class will handle the messages to the temp queue as well
            responseConsumer.setMessageListener(this);
 
            //Now create the actual message you want to send
            TextMessage txtMessage = session.createTextMessage();
            // 設定資訊
            txtMessage.setText("MyProtocolMessage");
 
            //Set the reply to field to the temp queue you created above, this is the queue the server
            //will respond to
            txtMessage.setJMSReplyTo(tempDest);
 
            //Set a correlation ID so when you get a response you know which sent message the response is for
            //If there is never more than one outstanding message to the server then the
            //same correlation ID can be used for all the messages...if there is more than one outstanding
            //message to the server you would presumably want to associate the correlation ID with this
            //message somehow...a Map works good
            String correlationId = this.createRandomString();
            txtMessage.setJMSCorrelationID(correlationId);
            this.producer.send(txtMessage);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }
 
    private String createRandomString() {
        Random random = new Random(System.currentTimeMillis());
        long randomLong = random.nextLong();
        return Long.toHexString(randomLong);
    }
 
    public void onMessage(Message message) {
        String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                System.out.println("響應內容 = " + messageText);
            }
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }
 
    public static void main(String[] args) {
        new Client();
    }
}</span>
Server
<span style="font-family:Microsoft YaHei;font-size:14px;">package com.mq.reqres;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ActiveMQConnectionFactory;
 
import javax.jms.*;
 
public class Server implements MessageListener {
    private static int ackMode;
    private static String messageQueueName;
    private static String messageBrokerUrl;
 
    private Session session;
    private boolean transacted = false;
    private MessageProducer replyProducer;
    private MessageProtocol messageProtocol;
 
    static {
        messageBrokerUrl = "tcp://localhost:61616";
        messageQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }
 
    public Server() {
        try {
            //This message broker is embedded
            BrokerService broker = new BrokerService();
            broker.setPersistent(false);
            broker.setUseJmx(false);
            broker.addConnector(messageBrokerUrl);
            broker.start();
        } catch (Exception e) {
            //Handle the exception appropriately
        }
 
        //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
        //is ready to handle messages
        this.messageProtocol = new MessageProtocol();
        this.setupMessageQueueConsumer();
    }
 
    private void setupMessageQueueConsumer() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            this.session = connection.createSession(this.transacted, ackMode);
            Destination adminQueue = this.session.createQueue(messageQueueName);
 
            //Setup a message producer to respond to messages from clients, we will get the destination
            //to send to from the JMSReplyTo header field from a Message
            this.replyProducer = this.session.createProducer(null);
            this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
            //Set up a consumer to consume messages off of the admin queue
            MessageConsumer consumer = this.session.createConsumer(adminQueue);
            consumer.setMessageListener(this);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }
 
    public void onMessage(Message message) {
        try {
            TextMessage response = this.session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                response.setText(this.messageProtocol.handleProtocolMessage(messageText));
            }
 
            //Set the correlation ID from the received message to be the correlation id of the response message
            //this lets the client identify which message this is a response to if it has more than
            //one outstanding message to the server
            response.setJMSCorrelationID(message.getJMSCorrelationID());
 
            //Send the response to the Destination specified by the JMSReplyTo field of the received message,
            //this is presumably a temporary queue created by the client
            this.replyProducer.send(message.getJMSReplyTo(), response);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }
 
    public static void main(String[] args) {
        new Server();
    }
}</span>
MessageProtocol
<span style="font-family:Microsoft YaHei;font-size:14px;">package com.mq.reqres;

public class MessageProtocol {
    public String handleProtocolMessage(String messageText) {
        String responseText;
        // 判斷是否是client傳過來的資訊,在這裡就可以做些解析
        if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
            responseText = "我收到了資訊";
        } else {
            responseText = "我不知道你傳的是什麼: " + messageText;
        }
         
        return responseText;
    }
}</span><span style="font-family:Microsoft YaHei;font-size:14px;">
</span>

結果
響應內容 = 我收到了資訊


相信大家都能看得懂,很容易理解。

這是我對MQ的理解,像Spinrg配置和負載均衡配置或者斷線重連等等問題,我會在以後遇到解決並跟新blog...

這裡說下我遇到的大坑,我們這邊的專案用的是Topic來處理訊號,然而外包那邊用的是Queue,結果就是訊號怎麼也過不來,我對MQ也不是很瞭解,我花了半天的時間檢查雙方的程式碼,確定是外包那邊的問題。外包一句感謝的話也沒說!!!當時就不開心了

我這裡想說的是,雙方之間通訊的方式要統一,一定要統一,一定一定要統一

參考Blog:http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html

相關推薦

卷積神經網路CNN1——影象卷積反捲積後卷積轉置卷積

1.前言    傳統的CNN網路只能給出影象的LABLE,但是在很多情況下需要對識別的物體進行分割實現end to end,然後FCN出現了,給物體分割提供了一個非常重要的解決思路,其核心就是卷積與反捲積,所以這裡就詳細解釋卷積與反捲積。     對於1維的卷積,公式(離散

android studio 簽名遇到的apk無法安裝安裝後閃退

今天需要拿出一個版本提測,打包好以後發給測試,測試反饋安裝上以後無法開啟,一開啟就閃退。 在網上找了些資料,也有遇到同樣問題的小夥伴,在此學習總結一下; 1.android studio 打包簽名後無法安裝到手機,總是安裝失敗。這時可能是在打包的時候沒有勾選圖中兩個。 2

ActiveMQ的使用遇到的相關釋出訂閱,resreq

1、介紹 ActiveMQ是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位

SpringBoot整合ActiveMQ訊息佇列和雙向佇列、釋出訂閱

ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。 &nbs

SpringBoot2.0之整合ActiveMQ模式

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/

SpringBoot+ActiveMq實現Queue訊息佇列

上篇博文主要分析了三種不同的請求方式,其中提到了基於訊息佇列的請求,當然只是從理論的角度去進行了分析,本篇博文就再次結合具體實現來說說訊息佇列。 一、什麼是訊息佇列? 作為中介軟體,訊息佇列是分散式應用間交換資訊的重要元件。訊息佇列可駐留在記憶體或磁碟上, 佇列可以儲存訊息直到它

activemq的幾種基本通訊方式總結-釋出訂閱

簡介      在前面一篇文章裡討論過幾種應用系統整合的方式,發現實際上面向訊息佇列的整合方案算是一個總體比較合理的選擇。這裡,我們先針對具體的一個訊息佇列Activemq的基本通訊方式進行探討。activemq是JMS訊息通訊規範的一個實現。總的來說,訊息規範裡面定義最常見的幾種訊息通訊模式主要有

Spring整合activeMq:模式

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:util="http://www.spring

《Pro Spring》學習筆記之Spring+ActiveMQ實現Queue通訊

spring配置檔案: <?xml version="1.0" encoding="UTF-8"?><beans    xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="h

ActiveMQ釋出-訂閱訊息模式模式的區別

點對點與釋出訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問題就是傳送到佇列的訊息能否重複消費(多訂閱) 點對點: 訊息生產者生產訊息傳送到queue中,然後訊息消費者從queue中取出並且消費訊息。這裡要注意:  訊息被消費以後,queue中不再有儲存,所以訊息消費

ActiveMQ 系統模型

        ActiveMQ 客戶端編寫支援Java, C, C++ 等多種語言,筆者使用Java 語言來實現。測試模型為:一個生產者生產訊息,兩個消費者消費訊息。 1. 引入jar 包   引入activemq-all-5.13.1.jar, 解壓apache-ac

分散式訊息中介軟體——ActiveMQ訊息模式

一、下載執行 1、官網下載      Windows版:apache-activemq-5.13.3-bin.zipLinux版:apache-activemq-5.13.3-bin.tar.gz 2、Windows下根據作業系統位數,執行activemq.bat檔案,啟動

ActiveMQ入門系列二:入門程式碼例項模式

在上一篇《ActiveMQ入門系列一:認識並安裝ActiveMQ(Windows下)》中,大致介紹了ActiveMQ和一些概念,並下載、安裝、啟動他,還訪問了他的控制檯頁面。 這篇,就用程式碼例項說下如何實現訊息的生產和消費。 一、理論基礎 同RabbitMQ一樣,ActiveMQ中也是有兩種模式:

JMS消息隊列ActiveMQ(模式)

jms activemq 消息隊列 生產者(producer)->消息隊列(message queue)package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactor

Web服務器之Tomcat的相關說明Web訪問中的角色協議問題和JavaWeb項目的程序結構問題

b/s架構 c/s架構 context.xml說明 server.xml說明 javaweb項目的程序結構 1、C/S架構和B/S架構的概念:a、C/S架構:- C/S,Client/Server,客戶端/服務器,客戶端需要安裝專用的客戶端軟件。客戶端是針對某以具體業務專門開發的軟件。-

activeMQ

ssa oca exceptio pac 開啟事務 ive mes 啟動 cal 摘要: ActiveMQ 點對點消息 Point-to-Point 是一對一 創建消息生產者 /** * 點對點消息生產者 * * @author Edward * */

訊息佇列模式&釋出訂閱

Java 訊息服務( Java  Message Service,JMS)應用程式介面是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。 點對點與釋出訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問

spring boot中使用websocket實現通訊伺服器推送

WebSocket介紹    websocket是html中一種新的協議,它實現了真正的長連線,實現了瀏覽器與伺服器的全雙工通訊(指在通訊的任意時刻,線路上存在A到B和B到A的雙向訊號傳輸)。 現在我們接觸的協議大多是htttp協議,在瀏覽器中通過http協議實現了單向的通訊

activemq模式createQueue

1.生產者JMSProducer package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; i

TensorFlow-cifar訓練測試自己資料進行分類和測試

第一部分:測試軟硬體 硬體:NVIDIA-GTX1080 軟體:Windows7、python3.6.5、tensorflow-gpu-1.4.0 第二部分:資料下載 資料集下載連結 第三部分:程式碼分步展示 第一步:匯入tensorflow import os fr