1. 程式人生 > >Rocket MQ傳送訊息的三種方式初析

Rocket MQ傳送訊息的三種方式初析

前言

MQ 傳送訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。基於版本4.2.0+。注意:順序訊息只支援可靠同步傳送。

可靠同步傳送

原理:同步傳送是指訊息傳送方發出資料後,會在收到接收方發回響應之後才發下一個數據包的通訊方式。

場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名簡訊通知、營銷簡訊系統等。

sync-send

可靠非同步傳送

原理:非同步傳送是指傳送方發出資料後,不等接收方發回響應,接著傳送下個數據包的通訊方式。 MQ 的非同步傳送,需要使用者實現非同步傳送回撥介面(SendCallback)。訊息傳送方在傳送了一條訊息後,不需要等待伺服器響應即可返回,進行第二條訊息傳送。傳送方通過回撥介面接收伺服器響應,並對響應結果進行處理。

場景:非同步傳送一般用於鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。

async_send

單向(Oneway)傳送

原理:單向(Oneway)傳送特點為傳送方只負責傳送訊息,不等待伺服器迴應且沒有回撥函式觸發,即只發送請求不等待應答。 此方式傳送訊息的過程耗時非常短,一般在微秒級別。

場景:適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。

oneway

三者的特點和主要區別。

傳送方式 傳送 TPS 傳送結果反饋 可靠性
同步傳送 不丟失
非同步傳送 不丟失
單向傳送 最快 可能丟失

code 實戰

同步傳送

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        Producer producer = ONSFactory.createProducer(properties);
        // 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可
        producer.start();
        //迴圈傳送訊息
        for (int i = 0; i < 100; i++){
            Message msg = new Message( //
                // Message 所屬的 Topic
                "TopicTestMQ",
                // Message Tag 可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
                "TagA",
                // Message Body 可以是任何二進位制形式的資料, MQ 不做任何干預,
                // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                "Hello MQ".getBytes());
            // 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一。
            // 以方便您在無法正常收到訊息情況下,可通過阿里雲伺服器管理控制檯查詢訊息並補發
            // 注意:不設定也不會影響訊息正常收發
            msg.setKey("ORDERID_" + i);
            try {
                SendResult sendResult = producer.send(msg);
                // 同步傳送訊息,只要不拋異常就是成功
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // 訊息傳送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // 在應用退出前,銷燬 Producer 物件
        // 注意:如果不銷燬也沒有問題
        producer.shutdown();
    }
}

非同步傳送

    public static void main(String[] args) {
        Properties properties = new Properties();
        Producer producer = ONSFactory.createProducer(properties);
        // 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可。
        producer.start();
        Message msg = new Message(
                // Message 所屬的 Topic
                "TopicTestMQ",
                // Message Tag,可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
                "TagA",
                // Message Body,任何二進位制形式的資料,MQ 不做任何干預,需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                "Hello MQ".getBytes());
        // 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一。 以方便您在無法正常收到訊息情況下,可通過 MQ 控制檯查詢訊息並補發。
        // 注意:不設定也不會影響訊息正常收發
        msg.setKey("ORDERID_100");
        // 非同步傳送訊息, 傳送結果通過 callback 返回給客戶端。
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                // 消費傳送成功
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }
            @Override
            public void onException(OnExceptionContext context) {
                // 訊息傳送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });
        // 在 callback 返回之前即可取得 msgId。
        System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
        // 在應用退出前,銷燬 Producer 物件。 注意:如果不銷燬也沒有問題
        producer.shutdown();
    }

單向(Oneway)傳送

 public static void main(String[] args) {
        Properties properties = new Properties();
        Producer producer = ONSFactory.createProducer(properties);
        // 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可。
        producer.start();
        //迴圈傳送訊息
        for (int i = 0; i < 100; i++){
            Message msg = new Message(
                    // Message 所屬的 Topic
                    "TopicTestMQ",
                    // Message Tag,
                    // 可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
                    "TagA",
                    // Message Body
                    // 任何二進位制形式的資料,MQ 不做任何干預,需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                    "Hello MQ".getBytes());
            // 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一。
            // 以方便您在無法正常收到訊息情況下,可通過阿里雲伺服器管理控制檯查詢訊息並補發。
            // 注意:不設定也不會影響訊息正常收發
            msg.setKey("ORDERID_" + i);
            // 由於在 oneway 方式傳送訊息時沒有請求應答處理,一旦出現訊息傳送失敗,則會因為沒有重試而導致資料丟失。若資料不可丟,建議選用可靠同步或可靠非同步傳送方式。
            producer.sendOneway(msg);
        }
        // 在應用退出前,銷燬 Producer 物件
        // 注意:如果不銷燬也沒有問題
        producer.shutdown();
    }

相關推薦

Rocket MQ傳送訊息方式

前言 MQ 傳送訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。基於版本4.2.0+。注意:順序訊息只支援可靠同步傳送。 可靠同步傳送 原理:同步傳送是指訊息傳送方發出

spring整合apache activemq實現訊息傳送方式程式碼配置例項

我們專案中傳送事件告警要用到訊息佇列,所以學習了下activemq,整理如下: activemq的介紹就不用說了,官網上大家可以詳細的看到。 1.下載並安裝activemq:地址http://activemq.apache.org/activemq-590-rel

php 模擬http傳送請求方式(curl,stream流的方式,)

一,curl cURL 是一個用來傳輸資料的工具,支援多種協議,如在 Linux 下用 curl 命令列可以傳送各種 HTTP 請求。PHP 的 cURL 是一個底層的庫,它能根據不同協議跟各種伺服器通訊,HTTP 協議是其中一種。 post請求 public

MQ傳送普通訊息方式

MQ 傳送普通訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。本文介紹了每種實現的原理、使用場景以及三種實現的異同,同時提供了程式碼示例以供參考。 可靠

RocketMQ(6)---傳送普通訊息方式

傳送普通訊息(三種方式) RocketMQ 傳送普通訊息有三種實現方式:可靠同步傳送、可靠非同步傳送、單向(Oneway)傳送。 注意 :順序訊息只支援可靠同步傳送。 GitHub地址: https://github.com/yudiandemingzi/SpringBootBlog 一、概念 1、可靠同步

Python 傳送 email 的方式

Python傳送email的三種方式,分別為使用登入郵件伺服器、使用smtp服務、呼叫sendmail命令來發送三種方法 本文原文自米撲部落格:Python 傳送 email 的三種方式 Python傳送email比較簡單,可以通過登入郵件服務來發送,linux下也可以使用呼叫sendmail命令來發送,

向伺服器傳送請求的方式 and 轉發和重定向的區別

1.三種方式:      1.html超連結      2.form表單      3.AJAX技術 例子:不通過超連結和表單訪問伺服器,還有其他方式:可以通過Ajax技術訪問伺服器 js程式碼: <script type="text/javascript"

向伺服器傳送請求的方式

表單提交 兩種提交方式:get、post <form action="get.php" method="get"> 暱稱:<input type="text" na

django-Ajax傳送POST請求(csrf跨站請求的方式),檔案的上傳

<h3>Ajax上傳檔案</h3> <p><input type="text" name="username" id="username" placeholder="username"></p> <p><input type="

kafka的訊息傳送模式

1. At most once模式    消費者讀取訊息,更新訊息的offset,然後處理訊息。這種方式的風險是在更新訊息的offset之後,處理訊息結果的輸出之前消費者掛掉,消費者再啟動的時候,從新的offset開始消費訊息,導致處理訊息丟失 2. At least

Ajax之post請求跨站請求csrf_token傳送處理de方式

方式一: $.post({ url: '/get_result/', data: { value0: $('#v1').val(),

Android自定義View體驗,實現圓形TextView的方式

自定義view對我來說一直是比較恐懼的,但是萬事開頭難,今天總結一下自己實現圓形TextView的三種方式。 首先來說一下自定義view的三種方式: 一,自繪控制元件: 自繪控制元件就是說介面展示的內容就是我們在ondraw()方法中繪製出來的,繼承Vie

Java方式實現傳送xml引數的WebService介面呼叫

專案開發中與第三方系統資料對接遇到的問題,僅用作記錄。 1.使用cxf呼叫(聯調時沒有收到響應資訊) JaxWsDynamicClientFactory clientFactory = JaxWsDynamicClientFactory.newInstance(); lo

程式設計實現簡訊傳送方式

2014年09月05日⁄ 綜合⁄ 共 3138字 ⁄ 字號 小 中 大 ⁄ 評論關閉 方案一: 利用sina webservice傳送簡訊 通過程式設計實現簡訊息的傳送是一件比較繁瑣的事情,目前,解決方法是通過計算機和手機的連線,使用手機程式語言編寫相關的簡訊程式來實硬體

調用類的方式

set div sharp csharp true ren light setname clas 1.T t; Teacher teach ;//T t = new T t(); teach.SetName("lizl"); teach.Say(); 2 *t T

Java多線程實現的方式

get() warning 三種方式 方式 緩存 運行 了解 ren ava Java多線程實現方式主要有三種:繼承Thread類、實現Runnable接口、使用ExecutorService、Callable、Future實現有返回結果的多線程。其中前兩種方式線程執行完後

java數組擴增的方式

實現 arrays 數組復制 當我 自身 ++ new 復制 log java數組聲明的時候必須聲明其長度,但當我們想對數組進行擴增的時候該怎麽辦呢? 下面三種方式都可以進行擴增,最後一種也最為方便。 1 /** 2 * 手動循環擴增

java寫入文件的方式比較

all mem exc 操作 測試文件 nts sys output println 1.FileOutputStream方式 2.BufferedOutputStream方式 3.FileWriter方式 經過多次測試,發現不緩存的FileOutputStream會比較慢

JAVA實現Base64編碼的方式

ack ons static nts bstr clas [] ram trace 摘要: Javabase64編碼的三種方式 有如下三種方式: 方式一:commons-codec.jar Java代碼 1. String base64String="whuang12

SpringMVC返回json數據的方式

class error log under itl gmv nbsp sin pri SpringMVC返回json數據的三種方式:http://blog.csdn.net/shan9liang/article/details/42181345 上述第三種方法:可能會出