1. 程式人生 > >rocketmq安裝、啟動、測試

rocketmq安裝、啟動、測試

rocketmq官網:http://rocketmq.apache.org

簡介:

     RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的訊息中介軟體。具有以下特性:

  1. 支援釋出/訂閱(Pub/Sub)和點對點(P2P)訊息模型
  2. 在一個佇列中可靠的先進先出(FIFO)和嚴格的順序傳遞
  3. 支援拉(pull)和推(push)兩種訊息模式
  4. 單一佇列百萬訊息的堆積能力
  5. 支援多種訊息協議,如 JMS、MQTT 等
  6. 分散式高可用的部署架構,滿足至少一次訊息傳遞語義
  7. 提供 docker 映象用於隔離測試和雲集群部署
  8. 提供配置、指標和監控等功能豐富的 Dashboard

1.下載

wget http://mirrors.hust.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip

2.切換到下載目錄進行解壓到rocketmq-all-4.2.0目錄

unzip  rocketmq-all-4.2.0-source-release.zip -d rocketmq-all-4.2.0

3.啟動(切換到bin目錄

3.1 啟動namesrv:

      nohup sh mqnamesrv &

     檢視啟動日誌:tail -f ~/logs/rocketmqlogs/namesrv.log

     啟動成功:The Name Server boot success…

3.2 啟動mqbroker

       nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &

      檢視日誌:tail -f ~/logs/rocketmqlogs/broker.log

      啟動成功:register brok

4.專業術語介紹

producer

訊息生產者,生產者的作用就是將訊息傳送到 MQ,生產者本身既可以產生訊息,如讀取文字資訊等。也可以對外提供介面,由外部應用來呼叫介面,再由生產者將收到的訊息傳送到 MQ。

producer group

生產者組,簡單來說就是多個傳送同一類訊息的生產者稱之為一個生產者組。在這裡可以不用關心,只要知道有這麼一個概念即可。

consumer

訊息消費者,簡單來說,消費 MQ 上的訊息的應用程式就是消費者,至於訊息是否進行邏輯處理,還是直接儲存到資料庫等取決於業務需要。

consumer group

消費者組,和生產者類似,消費同一類訊息的多個 consumer 例項組成一個消費者組。

topic

Topic 是一種訊息的邏輯分類,比如說你有訂單類的訊息,也有庫存類的訊息,那麼就需要進行分類,一個是訂單 Topic 存放訂單相關的訊息,一個是庫存 Topic 儲存庫存相關的訊息。

message

Message 是訊息的載體。一個 Message 必須指定 topic,相當於寄信的地址。Message 還有一個可選的 tag 設定,以便消費端可以基於 tag 進行過濾訊息。也可以新增額外的鍵值對,例如你需要一個業務 key 來查詢 broker 上的訊息,方便在開發過程中診斷問題。

tag

標籤可以被認為是對 Topic 進一步細化。一般在相同業務模組中通過引入標籤來標記不同用途的訊息。

broker

Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產者的訊息,儲存以及為消費者拉取訊息的請求做好準備。

name server

Name Server 為 producer 和 consumer 提供路由資訊。

RocketMQ 架構

由這張圖可以看到有四個叢集,分別是 NameServer 叢集、Broker 叢集、Producer 叢集和 Consumer 叢集:

  1. NameServer: 提供輕量級的服務發現和路由。 每個 NameServer 記錄完整的路由資訊,提供等效的讀寫服務,並支援快速儲存擴充套件。
  2. Broker: 通過提供輕量級的 Topic 和 Queue 機制來處理訊息儲存,同時支援推(push)和拉(pull)模式以及主從結構的容錯機制。
  3. Producer:生產者,產生訊息的例項,擁有相同 Producer Group 的 Producer 組成一個叢集。
  4. Consumer:消費者,接收訊息進行消費的例項,擁有相同 Consumer Group 的
    Consumer 組成一個叢集。

簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結構,它們之間會進行資料同步,即 Date Sync。同時每個 Broker 與
NameServer 叢集中的所有節
點建立長連線,定時註冊 Topic 資訊到所有 NameServer 中。

Producer 與 NameServer 叢集中的其中一個節點(隨機選擇)建立長連線,定期從 NameServer 獲取 Topic 路由資訊,並向提供 Topic 服務的 Broker Master 建立長連線,且定時向 Broker 傳送心跳。Producer 只能將訊息傳送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave
建立長連線,既可以從 Broker Master 訂閱訊息,也可以從 Broker Slave 訂閱訊息。

5.測試

 生產者:Producer.java

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

/**
 * rocketmq 生產者
 * create by zhaoyl at 2018-07-23
 */
@Service
public class Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQProducer.class);

    @Value("${spring.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    /**
     * 宣告並初始化一個producer
     * 需要一個producerGroup 名字作為構造方法的引數,這裡為grampus-order
     */
    private final DefaultMQProducer producer = new DefaultMQProducer("grampus-order");

    /**
     * 啟動生產者
     */
    @PostConstruct
    public void start() {
        //生產者延在消費者之後執行
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    LOGGER.info("MQ:啟動生產者");
                    /**
                     * 設定NameServer地址
                     * 此處應改為實際NameServer地址,多個地址之間用;分隔
                     * NameServer的地址必須有,不一定非得寫死在程式碼裡,這裡通過配置檔案獲取
                     */
                    producer.setNamesrvAddr(namesrvAddr);
                    /**
                     * 傳送失敗重試次數
                     */
                    producer.setRetryTimesWhenSendFailed(10);
                    /**
                     * 呼叫start()方法啟動一個producer例項
                     */
                    producer.start();
                    timer.cancel();
                } catch (MQClientException e) {
                    LOGGER.error("MQ:啟動生產者失敗:{}-{}", e.getResponseCode(), e.getErrorMessage());
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        },2000);

    }

    /**
     * 傳送訊息
     *
     * @param data  訊息內容
     * @param topic 主題
     * @param tags  標籤 如不需要消費topic下面的所有訊息,通過tag進行訊息過濾
     * @param keys  唯一主鍵
     */
    public void sendMessage(String data, String topic, String tags, String keys) {
        try {
            byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);

            Message mqMsg = new Message(topic, tags, keys, messageBody);

            producer.send(mqMsg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    LOGGER.info("MQ: 生產者傳送訊息 {}", sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    LOGGER.error(throwable.getMessage(), throwable);
                }
            });
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    /**
     * 傳送訊息根據orderId到指定佇列,解決訊息的順序問題
     * @param data
     * @param topic
     * @param tags
     * @param keys
     */
    public void sendMessageByOrderId(String data, String topic, String tags, String keys) {
        try {
            if(StringUtils.isBlank(data)){
                return;
            }
            JSONObject jsonObject = JSONObject.parseObject(data);

            //獲取訂單id
            String orderId = jsonObject.getString("orderId");
            byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);
            Message mqMsg = new Message(topic, tags, keys, messageBody);
            producer.send(mqMsg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {
                    //按照訂單號傳送到固定的佇列
                    int index = arg.hashCode() % list.size();
                    return list.get(index);
                }
            },orderId,new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    LOGGER.info("MQ: 生產者傳送訊息 {}", sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    LOGGER.error(throwable.getMessage(), throwable);
                }
            });

        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    @PreDestroy
    public void stop() {
        if (producer != null) {
            producer.shutdown();
            LOGGER.info("MQ:關閉生產者");
        }
    }
}

 消費者:Consumer.java

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mryx.grampus.order.api.IGOrderDetailService;
import com.mryx.grampus.order.api.IGOrderService;
import com.mryx.grampus.order.pojo.param.OrderQueryBean;
import com.mryx.grampus.order.pojo.result.OrderBase;
import com.mryx.grampus.order.pojo.result.OrderInfo;
import com.mryx.grampus.saas.domain.SaasOrder;
import com.mryx.grampus.saas.rpc.ISaasCooperatorService;
import com.mryx.grampus.saas.rpc.ISaasOrderDetailService;
import com.mryx.grampus.saas.rpc.ISaasOrderService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * rocketmq 消費者
 * create by zhaoyl at 2018-07-23
 */
@Service
public class Consumer{
    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

    ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    @Value("${spring.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Resource
    private IGOrderService gOrderService;

    @Resource
    private IGOrderDetailService gOrderDetailService;

    @Resource
    private ISaasOrderService saasOrderService;

    @Resource
    private ISaasOrderDetailService saasOrderDetailService;

    @Resource
    private ISaasCooperatorService saasCooperatorService;

    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("grampus-order");
    /**
     * 初始化
     *
     * @throws MQClientException
     */
    @PostConstruct
    public void start() {
        try {
            LOGGER.info("MQ:啟動消費者");
            /**
             * NameServer的地址和埠,多個逗號分隔開,達到消除單點故障的目的
             */
            consumer.setNamesrvAddr(namesrvAddr);
            /**
            *  批量消費的數量
            *  1.如果consumer先啟動,producer發一條consumer消費一條
            *  2.如果consumer後啟動,mq堆積資料,consumer每次消費設定的數量
            */
            consumer.setConsumeMessageBatchMaxSize(5);
            /**
             * consumer的消費策略
             * CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息
             * CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
             * CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            /**
             * 消費模式:
             * 1.CLUSTERING:叢集,預設
             *   同一個Group裡每個consumer只消費訂閱訊息的一部分內容,也就是同一groupName,所有消費的內容加起來才是訂閱topic內容的整體,達到負載均衡的目的
             * 2.BROADCASTING:廣播模式
             *   同一個Group裡每個consumer都能消費到所訂閱topic的全部訊息,也就是一個訊息會被分發多次,被多個consumer消費
             *   廣播訊息只發送一次,沒有重試
             */
           consumer.setMessageModel(MessageModel.CLUSTERING);
            /**
             * 設定consumer所訂閱的Topic和Tag,*代表全部的Tag
             */
            consumer.subscribe(TopicEnum.ORDER.getValue(), "*");
            // 註冊訊息監聽器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                /**
                 * 消費訊息
                 * @param msgs
                 * @param context
                 * @return
                 */
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    int index = 0;
                    try {
                        for (; index < msgs.size(); index++) {
                            MessageExt msg = msgs.get(index);
                            String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            LOGGER.info("收到訊息:" + messageBody);
                            /*threadPool.execute(new Runnable() {
                                @Override
                                public void run() {
                                    if(TopicEnum.ORDER.getValue().equals(msg.getTopic())){
                                        //訂單訊息處理
                                        handelOrderMsg(messageBody);
                                    }
                                }
                            });*/
                        }
                    } catch (Exception e) {
                        LOGGER.error(e.getMessage(), e);
                        /**
                         * 重試機制(consumer),僅限於CLUSTERING模式
                         * 1.exception的情況,一般重複16次 10s、30s、1分鐘、2分鐘、3分鐘等等
                         *   獲取重試次數:msgs.get(0).getReconsumeTimes()
                         * 2.超時的情況,這種情況MQ會無限制的傳送給消費端
                         *   就是由於網路的情況,MQ傳送資料之後,Consumer端並沒有收到導致超時。也就是消費端沒有給我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;這樣的就認為沒有到達Consumer端
                         */
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    } finally {
                        if (index < msgs.size()) {
                            context.setAckIndex(index + 1);
                        }
                    }
                    /**
                     * 返回消費狀態:
                     * CONSUME_SUCCESS 消費成功
                     * RECONSUME_LATER 消費失敗,需要稍後重新消費
                     */
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 啟動消費端
            consumer.start();
        } catch (MQClientException e) {
            LOGGER.error("MQ:啟動消費者失敗:{}-{}", e.getResponseCode(), e.getErrorMessage());
            throw new RuntimeException(e.getMessage(), e);
        }

    }

    @PreDestroy
    public void stop() {
        if (consumer != null) {
            consumer.shutdown();
            LOGGER.error("MQ:關閉消費者");
        }
    }