1. 程式人生 > >RocketMQ讀書筆記2——生產者

RocketMQ讀書筆記2——生產者

ddr name 負載均衡策略 選中 找到 slave 配置 提高 實現

【生產者的不同寫入策略】

生產者向消息隊列裏寫入數據,不同的業務需要生產者采用不同的寫入策略:

同步發送、異步發送、延遲發送、發送事務消息等。

【DefaultMQProduce示例】

public class ProducerQuickStart {

    public static void main(String[] args) throws MQClientException,InterruptedException {
        /**1.設置Producer的GroupName**/
        DefaultMQProducer producer 
= new DefaultMQProducer("GROUP_B"); /**2.設置Instance**/ producer.setInstanceName("instanceB"); /**3.設置發送失敗的重試次數**/ producer.setRetryTimesWhenSendFailed(3); /**4.設置NameServer的地址**/ producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.2:9876"); /**5.啟動Producer*
*/
producer.start(); for (int i = 0; i < 10; i++) { try{ /**6.組裝消息並發送**/ Message msg = new Message("TopicTest","TagA", ("Hello HigginCui:"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg,
new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("sendResultStatus:" + sendResult.getSendStatus()); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); }catch (Exception e){ e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }

[ 提示:設置Instance ]

當一個JVM需要啟動多個Producer的時候,通過設置不同的InstanceName來區分,不設置的話系統使用默認名稱“DEFAULT”。

[ 提示:設置發送失敗的重試次數 ]

當網絡出現異常的時候,這個次數影響消息的重復投遞次數。想保證消息不丟失,可以設置多重試幾次。

【消息發送的返回值】

FLUSH_DISK_TIMEOUT
刷盤超時(需要Broker設置為SYNC_FLUSH同步刷盤才會報這個錯)
FLUSH_SLAVE_TIMEOUT
主從同步超時(在主備方式,且Broker設置為SYNC_MASTER情況下)
SLAVE_NOT_AVALIABLE
沒有找到被設置成SLAVE的Broker。(在主備方式,且Broker設置成SYNC_MASTER的情況下)
SEND_OK
發送成功(需要結合所配置的 刷盤策略、主從策略來定)

【延遲消息】

RocketMQ支持延遲消息,Broker收到這類消息後,延遲一段時間再處理,使消息在規定的一段時間內生效。

延遲消息使用方法:

在創建Message對象時,調用setDelayTimeLevel(int level)方法設置延遲時間,然後再把這個新消息發送出去。

目前延遲消息不支持任意設置,僅支持預設值的時間長度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。如setDelayTimeLevel(3)表示延遲10s。

【自定義消息發送規則】

  一個Topic下會有多個MessageQueue,如果使用Producer的默認配置,這個Producer會輪流向各個MessageQueue發送消息。Consumer消費的時候,會根據負載均衡策略,消費分配到的MessageQueue。不經過特定設置,某條消息發往哪個MessageQueue,被哪個Consumer消費都是未知的,

[ 如果把同一類型的消息發往相同的MessageQueue? ]

想把同一類型的消息發往相同的MessageQueue,可以用MessageQueueSelector。

代碼示例:

public class OrderMessageQueueSelector implements MessageQueueSelector {

    /**
     * 根據訂單的id值平均分配對應的MessageQueue
     * @param mqs 消息隊列
     * @param msg 消息
     * @param orderKey
     * @return
     */
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object orderKey) {
        int id = Integer.parseInt(orderKey.toString());
        int idMainIndex = id/100;
        int size = mqs.size(); //MessageQueue的總數
        int index= idMainIndex /size ;
        return mqs.get(index);  //返回選中的MessageQueue
    }
}

在發送消息的時候,把MessageQueueSelector的對象作為參數,使用

MQProducer接口的自定義發送方法:
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)

在MessageQueueSelector的實現中,根據傳入的Object參數,或者根據Message消息內容確定把消息發往哪個MessageQueue,返回被選中的MessageQueue。

【事務消息】

RocketMQ的事務消息:發送消息事件和其他事件需要同時成功或失敗。

如轉賬操作:A賬戶轉賬1W到B賬戶,A發送"B賬戶加1W"的消息,要和"A賬戶扣除1W"的這個操作同時成功或失敗。

[ 關鍵詞 ]

兩階段提交。

[ 大致流程 ]

RocketMQ采用兩階段提交的方式實現事務消息。

TransactionMQProducer處理流程如下:

1.先發一個"B賬戶增加1W"的待確認消息。

2.發送成功後做"A賬戶扣除1W"的操作

3.根據"A賬戶扣除1W"的操作成功與否,決定之前"B賬戶增加1W"的消息是commit還是rollback。

[ 具體流程 ]

1.Producer向MQ發送"B賬戶增加1W"的待確認消息。

2.RocketMQ將這個待確認消息持久化成功後,向Producer回復消息發送成功,此時第一階段消息發送完成。

3.執行本地事件邏輯,即"A賬戶扣除1W"的操作。

4.Producer根據本地事件執行結果向RocketMQ發送二次確認(Commit或RollBack)消息:

如果收到Commit狀態則將第一階段的待確認消息標記為“可投遞”,Consumer將收到該消息;

如果收到RocketBack狀態則刪除第一階段的待確認消息,Consumer無法收到該消息。

5.若中途出現異常,步驟4提交的二次確認最終未到達RocketMQ,服務器在經過固定的時間會對“待確認”消息發起回查請求。

6.Producer收到回查請求後,通過檢查本地對應消息的本地事件執行結果返回Commit或RockBack狀態(如果發送第一階段待確認消息的Producer不能工作,回查請求將被發送到和Producer在同一個Group裏的其他Producer)。

【為什麽RocketMQ4.x版本刪除事務消息】

雖然上述的方案很好地實現了事務消息功能,也是RocketMQ之前的版本實現事務消息的邏輯,因為RocketMQ依賴將數據順序寫到磁盤的這個特征來提高性能,步驟4需要更改第一階段待確認消息的狀態,這樣會導致磁盤Catch的臟頁過多,降低了系統性能,所以RocketMQ在4.x版本將這部分功能去除了。

RocketMQ讀書筆記2——生產者