1. 程式人生 > >ActiveMQ的queue以及topic兩種訊息處理機制分析

ActiveMQ的queue以及topic兩種訊息處理機制分析

Q來作為jms匯流排,並且給大家介紹了activeMQ的叢集和高可用部署方案,本期給大家再介紹下,如何根據自己的專案需求,更好地使用activeMQ的兩種訊息處理模式。

1    queue與topic的技術特點對比

Topic

Queue

概要

Publish Subscribe messaging 釋出訂閱訊息

Point-to-Point 點對點

有無狀態

topic資料預設不落地,是無狀態的。

Queue資料預設會在mq伺服器上以檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB儲存。

完整性保障

並不保證publisher釋出的每條資料,Subscriber都能接受到。

Queue保證每條資料都能被receiver接收。

訊息是否會丟失

一般來說publisher釋出訊息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到訊息;如果沒有sub在監聽,該topic就丟失了。

Sender傳送訊息到目標Queue,receiver可以非同步接收這個Queue上的訊息。Queue上的訊息如果暫時沒有receiver來取,也不會丟失。

訊息釋出接收策略

一對多的訊息釋出接收策略,監聽同一個topic地址的多個sub都能收到publisher傳送的訊息。Sub接收完通知mq伺服器

一對一的訊息釋出接收策略,一個sender傳送的訊息,只能有一個receiver接收。receiver接收完後,通知mq伺服器已接收,mq伺服器對queue裡的訊息採取刪除或其他操作。

          Topic和queue的最大區別在於topic是以廣播的形式,通知所有線上監聽的客戶端有新的訊息,沒有監聽的客戶端將收不到訊息;而queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。

2    topic和queue方式的訊息處理效率比較

        通過增加監聽客戶端的併發數來驗證,topic的訊息推送,是否會因為監聽客戶端的併發上升而出現明顯的下降,測試環境的伺服器為ci環境的ActiveMQ,客戶端為我的本機。

        從實測的結果來看,topic方式傳送的訊息,傳送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(執行緒)併發的前提下,效率差異很明顯(由於500執行緒併發的情況下,我本機的cpu佔用率已高達70-90%,所以無法確認是我本機測試造成的效能瓶頸還是topic訊息傳送方式存在效能瓶頸,造成效率下降如此明顯)。

        Topic方式傳送的訊息與queue方式傳送的訊息,傳送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者併發的前提下,topic方式的效率明顯低於queue。

        Queue方式傳送的訊息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,傳送和接收的效率沒有明顯變化。

Topic實測資料:

傳送者傳送的訊息總數

所有訂閱者接收到訊息的總數

訊息傳送和接收平均耗時

單訂閱者

100

100

101ms

100訂閱者

100

10000

103ms

500訂閱者

100

50000

14162ms

Queue實測資料:

傳送者傳送的訊息總數

所有訂閱者接收到訊息的總數

訊息傳送和接收平均耗時

單訂閱者

100

100

96ms

100訂閱者

100

100

96ms

500訂閱者

100

100

100ms

3     topic方式的訊息處理示例
3.1     通過客戶端程式碼呼叫來發送一個topic的訊息:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

publicclass SendTopic {

    privatestaticfinalint SEND_NUMBER = 5;

    publicstaticvoid sendMessage(Session session, MessageProducer producer)

            throws Exception {

         for ( int i = 1; i <= SEND_NUMBER; i++) {

            TextMessage message = session

                    .createTextMessage("ActiveMq傳送的訊息" + i);

            //傳送訊息到目的地方

            System. out.println("傳送訊息:" + "ActiveMq 傳送的訊息" + i);

            producer.send(message);

        }

    }

    publicstaticvoid main(String[] args) {

        // ConnectionFactory:連線工廠,JMS用它建立連線

        ConnectionFactory connectionFactory;

        // Connection:JMS客戶端到JMS Provider的連線

        Connection connection = null;

        // Session:一個傳送或接收訊息的執行緒

        Session session;

        // Destination:訊息的目的地;訊息傳送給誰.

        Destination destination;

        // MessageProducer:訊息傳送者

        MessageProducer producer;

        // TextMessage message;

        //構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar

        connectionFactory = new ActiveMQConnectionFactory(

                ActiveMQConnection. DEFAULT_USER,

                ActiveMQConnection. DEFAULT_PASSWORD,

                "tcp://10.20.8.198:61616");

        try {

            //構造從工廠得到連線物件

            connection = connectionFactory.createConnection();

            //啟動

            connection.start();

            //獲取操作連線

            session = connection.createSession( true, Session. AUTO_ACKNOWLEDGE);

            //獲取session注意引數值FirstTopic是一個伺服器的topic(與queue訊息的傳送相比,這裡是唯一的不同)

            destination = session.createTopic("FirstTopic");

            //得到訊息生成者【傳送者】

            producer = session.createProducer(destination);

            //設定不持久化,此處學習,實際根據專案決定

            producer.setDeliveryMode(DeliveryMode. PERSISTENT);

            //構造訊息,此處寫死,專案就是引數,或者方法獲取

            sendMessage(session, producer);

            session.commit();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            try {

                if ( null != connection)

                    connection.close();

            } catch (Throwable ignore) {

            }

        }

    }

}

3.2     啟動多個客戶端監聽來接收topic的訊息:

publicclass ReceiveTopic implements Runnable {

      private StringthreadName;

      ReceiveTopic(String threadName) {

           this.threadName = threadName;

      }

      publicvoid run() {

           // ConnectionFactory:連線工廠,JMS用它建立連線

           ConnectionFactory connectionFactory;

           // Connection:JMS客戶端到JMS Provider的連線

           Connection connection = null;

           // Session:一個傳送或接收訊息的執行緒

           Session session;

           // Destination:訊息的目的地;訊息傳送給誰.

           Destination destination;

           //消費者,訊息接收者

           MessageConsumer consumer;

           connectionFactory = new ActiveMQConnectionFactory(

                      ActiveMQConnection. DEFAULT_USER,

                      ActiveMQConnection. DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");

           try {

                 //構造從工廠得到連線物件

                 connection = connectionFactory.createConnection();

                 //啟動

                 connection.start();

                 //獲取操作連線,預設自動向伺服器傳送接收成功的響應

                 session = connection.createSession( false, Session. AUTO_ACKNOWLEDGE);

                 //獲取session注意引數值FirstTopic是一個伺服器的topic

                 destination = session.createTopic("FirstTopic");

                 consumer = session.createConsumer(destination);

                 while ( true) {

                      //設定接收者接收訊息的時間,為了便於測試,這裡設定為100s

                      TextMessage message = (TextMessage) consumer

                                  .receive(100 * 1000);

                      if ( null != message) {

                            System. out.println("執行緒"+threadName+"收到訊息:" + message.getText());

                      } else {

                            continue;

                      }

                 }

           } catch (Exception e) {

                 e.printStackTrace();

           } finally {

                 try {

                      if ( null != connection)

                            connection.close();

                 } catch (Throwable ignore) {

                 }

           }

      }

      publicstaticvoid main(String[] args) {

            //這裡啟動3個執行緒來監聽FirstTopic的訊息,與queue的方式不一樣三個執行緒都能收到同樣的訊息

           ReceiveTopic receive1= new ReceiveTopic("thread1");

           ReceiveTopic receive2= new ReceiveTopic("thread2");

           ReceiveTopic receive3= new ReceiveTopic("thread3");

           Thread thread1= new Thread(receive1);

           Thread thread2= new Thread(receive2);

           Thread thread3= new Thread(receive3);

           thread1.start();

           thread2.start();

           thread3.start();

      }

}

4     queue方式的訊息處理示例

         參考上一期文章:開源jms服務ActiveMQ的負載均衡+高可用部署方案探索。

相關推薦

ActiveMQ的queue以及topic訊息處理機制分析

Q來作為jms匯流排,並且給大家介紹了activeMQ的叢集和高可用部署方案,本期給大家再介紹下,如何根據自己的專案需求,更好地使用activeMQ的兩種訊息處理模式。 1    queue與topic的技術特點對比 Topic Queue 概要 Publish Subscribe mes

Android應用程式鍵盤(Keyboard)訊息處理機制分析

        在Android系統中,鍵盤按鍵事件是由WindowManagerService服務來管理的,然後再以訊息的形式來分發給應用程式處理,不過和普通訊息不一樣,它是由硬體中斷觸發的;在上一篇文章《Android應用程式訊息處理機制(Looper、Handler)分

ActiveMQ訊息模式以及為什麼使用MQ

1.為什麼使用MQ  a.高併發 在高併發分散式環境下,由於來不及同步處理,請求往往發生堵塞;通過訊息佇列,可以非同步處理請求,緩解系統的壓力; b.鬆耦合性 一個應用傳送訊息到MQ之後並不關係訊息如何或者什麼時候被傳遞,同樣的訊息的接收者也不關係訊息從哪裡來的。在不同的環

SpringBoot 整合 RabbitMQ(包含三訊息確認機制以及消費端限流)

目錄 說明 生產端 消費端 說明 本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種訊息的確認模式,如果查詢詳細的確認模式設定,請閱讀:RabbitMQ的三種訊息確認

Android學習筆記(36):Android的事件處理方式

post gravity cal log 基於 處理方法 hang mil 重寫 Android提供了兩種事件處理的方式:基於回調的事件處理 和 基於監聽的事件處理。 我們來說的easy理解一點: (1)基於回調的事件處理就是繼承GUI組件,並重寫該組件的

簡單的實現圖片預覽, 通過原生ajax以及 jQuery方法實現圖片預覽,有更好的辦法可以留言喔................

XML HP OS image end php代碼 append sda ext 1.原生寫ajax實現圖片預覽:   結構:     <input type="file">       <img src="" > JavaScri

[js]js中6錯誤處理機制

ram java throw mage 代碼執行 class 錯誤處理機制 code catch js中6種錯誤 http://javascript.ruanyifeng.com/grammar/error.html#toc5 https://www.jianshu.co

android 非同步訊息處理機制 — AHandler

1. 引入 ALooper、AHandler、AMessage 在 android multimedia stagefright 的框架程式碼中,通篇都是這幾個類的身影,所以熟悉 android 多媒體框架的第一步必須理解這幾個類的含義。 這幾個類是為了實現非同步訊息機制而設計的

Android非同步訊息處理機制詳解及原始碼分析

PS一句:最終還是選擇CSDN來整理髮表這幾年的知識點,該文章平行遷移到CSDN。因為CSDN也支援MarkDown語法了,牛逼啊! 【工匠若水 http://blog.csdn.net/yanbober 轉載煩請註明出處,尊重分享成果】 最近相對來說比較閒,加上養病,所

Redis的訊息模式

Redis的兩種訊息模式 佇列模式 釋出訂閱模式 佇列模式 佇列模式下每個消費者可以同時從多個伺服器讀取訊息,但是每個訊息只能被一個消費者讀取。 在佇列模式下其實每次插入的資料都是載入在最前面的,而先插入的資料在後面,列表中始終維持了一個佇列故稱之為佇

ActiveMQ的訊息形式。

一、訊息的傳遞型別 點對點:即一個生產者和一個消費者一一對應 PTP的過程好比是兩個人打電話,這兩個人獨享這一條通訊鏈路。一方傳送訊息,另外一方接收 訊息 。在實際應用中因為有多個使用者對使用 PTP 的鏈路,它的通訊場景如下圖所示:

【演算法模板】二叉樹的三遍歷方式,以及根據遍歷方式建樹

前言:今年九月份的PAT考試就栽在這“兩種遍歷建樹”上了,剛好沒看,剛好考到。作為自己的遺憾,今日碼完,貼在這裡留個紀念,希望能給自己警醒與警鐘。 簡要概括: 1、二叉樹的三種遍歷方式分別是 先序(先根)遍歷PreOrder,中序(中根)遍歷InOrder,後序(後根

Android之訊息處理機制(二)Handler的本質-Message和Looper到底是什麼?

目錄 Android之訊息處理機制(二) 以下皆為乾貨,比較幹,需要讀者細細理解。  前面(一)已經解釋了Handler的基本機制了,下面來概括一下本質。 一、MessageQueue        MessageQueue其實就

C# 訊息處理機制及自定義過濾方式

一、訊息概述 Windows 下應用程式的執行是通過訊息驅動的。訊息是整個應用程式的工作引擎,我們需要理解掌握我們使用的程式語言是如何封裝訊息的原理。1. 什麼是訊息(Message) 訊息就是通知和命令。在.NET框架類庫中的System.Windows.Forms名稱

ActiveMQ的幾訊息持久化機制

為了避免意外宕機以後丟失資訊,需要做到重啟後可以恢復訊息佇列,訊息系統一般都會採用持久化機制。 ActiveMQ的訊息持久化機制有JDBC,AMQ,KahaDB和LevelDB四種方式,無論使用哪種持久化方式,訊息的儲存邏輯都是一致的。 就是在傳送者將訊息傳送出

Android非同步訊息處理機制:Looper、Handler、Message

1 簡介 Handler,Looper,Message這三者都與Android非同步訊息處理執行緒相關, Looper:負責建立一個MessageQueue,然後進入一個無限迴圈體不斷從該MessageQueue中讀取訊息; Handler:訊息建立者,一個或者多個

ActiveMQ(二)——訊息處理機制

一、前言 上文中,小編提到安裝ActiveMQ,但是對於ActiveMQ中訊息是用什麼樣的形式儲存的?下面小編就向大家介紹一下。 二、訊息型別 對於訊息的傳遞有兩種型別: 1.點對點的,即一個生產者和一個消費者一一對應; 2.釋出/訂閱模式,即一個生產者產生訊

淺談Android的訊息處理機制--Handler

1.為什麼有Handler? 在UI執行緒中不能進行耗時操作,例如資料讀寫、網路請求、圖片載入等,所以這些操作被放在子執行緒裡,Handler便是子執行緒和UI執行緒之間通訊的橋樑之一。 2.幹什麼用的? 進行非同步訊息處理,即上述內容。 3.Handler類裡面有什麼是必須知道

android的訊息處理機制——Looper,Handler,Message (原理圖、原始碼)

轉自:http://my.oschina.net/u/1391648/blog/282892 在開始討論android的訊息處理機制前,先來談談一些基本相關的術語。   通訊的同步(Synchronous):指向客戶端傳送請求後,必須要在服務端有迴應後客戶端才繼續傳送

Android Handler 非同步訊息處理機制的妙用 建立強大的圖片載入類

                最近建立了一個群,方便大家交流,群號:55032675上一篇部落格介紹了Android非同步訊息處理機制,如果你還不瞭解,可以看:Android 非同步訊息處理機制 讓你深入理解 Looper、Handler、Message三者關係 。那篇部落格的最後,提出可以把非同步訊息處理