1. 程式人生 > >RocketMQ入門案例

RocketMQ入門案例

val ets max linu tco 入門 自動 line admin

  學習RocketMQ,先寫一個Demo演示一下看看效果。

一、服務端部署

  因為只是簡單的為了演示效果,服務端僅部署單Master模式 —— 一個Name Server節點,一個Broker節點。主要有一下過程。

    1. 下載RocketMQ源碼、編譯(也可以網上下載編譯好的文件),這裏使用最新的4.4.0版本,下載好之後放在Linux上通過一下命令解壓縮、編譯。
      unzip rocketmq-all-4.4.0-source-release.zip
      cd rocketmq-all-4.4.0/
      mvn -Prelease-all -DskipTests clean install –U

    2. 編譯之後到distribution/target/apache-rocketmq目錄,後續所有操作都是在該路徑下。
      cd distribution/target/apache-rocketmq

    3. 啟動Name Server,查看日誌確認啟動成功。
      nohup sh bin/mqnamesrv &
      tail 
      -f ~/logs/rocketmqlogs/namesrv.log

    4. 啟動Broker,查看日誌確認啟動成功。
      nohup sh bin/mqbroker -n localhost:9876 &
      tail -f ~/logs/rocketmqlogs/broker.log

  Name Server和Broker都成功啟動,服務器就部署完成了。更詳細的參考官方文檔手冊,裏面還包含在服務器上運行Producer、Customer示例,這裏主要是在項目中使用。

  官網手冊戳這裏:Quick Start

二、客戶端搭建:Spring Boot項目中使用

  客戶端分為消息生產者和消息消費者,這裏通過日誌打印輸出查看效果,為了看起來更清晰,我新建了兩個模塊分別作為消息生產者和消息消費者。

  1. 添加依賴,在兩個模塊的pom文件中添加以下配置。
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
  2. 配置生產者模塊。
    • application.yml文件中增加用來初始化producer的相關配置,這裏只配了一部分,更詳細的配置參數可以查看官方文檔。
      # RocketMQ生產者
      rocketmq:
        producer:
          # Producer組名,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組。默認DEFAULT_PRODUCER
          producerGroup: ${spring.application.name}
          # namesrv地址
          namesrvAddr: 192.168.101.213:9876
          # 客戶端限制的消息大小,超過報錯,同時服務端也會限制,需要跟服務端配合使用。默認4MB
          maxMessageSize: 4096
          # 發送消息超時時間,單位毫秒。默認10000
          sendMsgTimeout: 5000
          # 如果消息發送失敗,最大重試次數,該參數只對同步發送模式起作用。默認2
          retryTimesWhenSendFailed: 2
          # 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節。默認4096
          compressMsgBodyOverHowmuch: 4096
          # 在發送消息時,自動創建服務器不存在的topic,需要指定Key,該Key可用於配置發送消息所在topic的默認路由。
          createTopicKey: XIAO_LIU

    • 新增producer配置類,系統啟動時讀取yml文件的配置信息初始化producer。集群模式下,如果在同一個jvm中,要往多個的MQ集群發送消息,則需要創建多個的producer並設置不同的instanceName,默認不需要設置該參數。
      @Configuration
      public class ProducerConfiguration {
          private static final Logger LOGGER = LoggerFactory.getLogger(ProducerConfiguration.class);
      
          /**
           * Producer組名,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組。默認DEFAULT_PRODUCER
           */
          @Value("${rocketmq.producer.producerGroup}")
          private String producerGroup;
          /**
           * namesrv地址
           */
          @Value("${rocketmq.producer.namesrvAddr}")
          private String namesrvAddr;
          /**
           * 客戶端限制的消息大小,超過報錯,同時服務端也會限制,需要跟服務端配合使用。默認4MB
           */
          @Value("${rocketmq.producer.maxMessageSize}")
          private Integer maxMessageSize;
          /**
           * 發送消息超時時間,單位毫秒。默認10000
           */
          @Value("${rocketmq.producer.sendMsgTimeout}")
          private Integer sendMsgTimeout;
          /**
           * 如果消息發送失敗,最大重試次數,該參數只對同步發送模式起作用。默認2
           */
          @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
          private Integer retryTimesWhenSendFailed;
          /**
           * 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節。默認4096
           */
          @Value("${rocketmq.producer.compressMsgBodyOverHowmuch}")
          private Integer compressMsgBodyOverHowmuch;
          /**
           * 在發送消息時,自動創建服務器不存在的topic,需要指定Key,該Key可用於配置發送消息所在topic的默認路由。
           */
          @Value("${rocketmq.producer.createTopicKey}")
          private String createTopicKey;
      
          @Bean
          public DefaultMQProducer getRocketMQProducer() {
      
              DefaultMQProducer producer = new DefaultMQProducer(this.producerGroup);
              producer.setNamesrvAddr(this.namesrvAddr);
              producer.setCreateTopicKey(this.createTopicKey);
      
              if (this.maxMessageSize != null) {
                  producer.setMaxMessageSize(this.maxMessageSize);
              }
              if (this.sendMsgTimeout != null) {
                  producer.setSendMsgTimeout(this.sendMsgTimeout);
              }
              if (this.retryTimesWhenSendFailed != null) {
                  producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
              }
              if (this.compressMsgBodyOverHowmuch != null) {
                  producer.setCompressMsgBodyOverHowmuch(this.compressMsgBodyOverHowmuch);
              }
              if (Strings.isNotBlank(this.createTopicKey)) {
                  producer.setCreateTopicKey(this.createTopicKey);
              }
      
              try {
                  producer.start();
      
                  LOGGER.info("Producer Started : producerGroup:[{}], namesrvAddr:[{}]"
                          , this.producerGroup, this.namesrvAddr);
              } catch (MQClientException e) {
                  LOGGER.error("Producer Start Failed : {}", e.getMessage(), e);
              }
              return producer;
          }
      
      }

    • 使用producer實例向MQ發送消息。
      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class ProducerServiceApplicationTests {
          private static final Logger LOGGER = LoggerFactory.getLogger(ProducerServiceApplicationTests.class);
          @Autowired
          private DefaultMQProducer defaultMQProducer;
      
          @Test
          public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
              for (int i = 0; i < 100; i++) {
                  User user = new User();
                  user.setUsername("用戶" + i);
                  user.setPassword("密碼" + i);
                  user.setSex(i % 2);
                  user.setBirthday(new Date());
                  Message message = new Message("user-topic", "user-tag", JSON.toJSONString(user).getBytes(RemotingHelper.DEFAULT_CHARSET));
                  SendResult sendResult = defaultMQProducer.send(message);
                  LOGGER.info(sendResult.toString());
              }
          }
      }

  3. 配置消費者模塊。
    • application.yml文件中增加用來初始化consumer的相關配置,同樣參數這裏只配了一部分,更詳細的配置參數可以查看官方文檔。
      # RocketMQ消費者
      rocketmq:
        consumer:
          # Consumer組名,多個Consumer如果屬於一個應用,訂閱同樣的消息,且消費邏輯一致,則應該將它們歸為同一組。默認DEFAULT_CONSUMER
          consumerGroup: ${spring.application.name}
          # namesrv地址
          namesrvAddr: 192.168.101.213:9876
          # 消費線程池最大線程數。默認10
          consumeThreadMin: 10
          # 消費線程池最大線程數。默認20
          consumeThreadMax: 20
          # 批量消費,一次消費多少條消息。默認1
          consumeMessageBatchMaxSize: 1
          # 批量拉消息,一次最多拉多少條。默認32
          pullBatchSize: 32
          # 訂閱的主題
          topics: user-topic

    • 新增consumer配置。
      @Configuration
      public class ConsumerConfiguration {
          private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerConfiguration.class);
      
          @Value("${rocketmq.consumer.consumerGroup}")
          private String consumerGroup;
          @Value("${rocketmq.consumer.namesrvAddr}")
          private String namesrvAddr;
          @Value("${rocketmq.consumer.consumeThreadMin}")
          private int consumeThreadMin;
          @Value("${rocketmq.consumer.consumeThreadMax}")
          private int consumeThreadMax;
          @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
          private int consumeMessageBatchMaxSize;
          @Value("${rocketmq.consumer.pullBatchSize}")
          private int pullBatchSize;
          @Value("${rocketmq.consumer.topics}")
          private String topics;
      
          private final ConsumeMsgListener consumeMsgListener;
      
          @Autowired
          public ConsumerConfiguration(final ConsumeMsgListener consumeMsgListener) {
              this.consumeMsgListener = consumeMsgListener;
          }
      
          @Bean
          public DefaultMQPushConsumer getRocketMQConsumer() {
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
              consumer.setNamesrvAddr(namesrvAddr);
              consumer.setConsumeThreadMin(consumeThreadMin);
              consumer.setConsumeThreadMax(consumeThreadMax);
              consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
              consumer.setPullBatchSize(pullBatchSize);
              consumer.registerMessageListener(consumeMsgListener);
      
              try {
                  /**
                   * 設置消費者訂閱的主題和tag。subExpression參數為*表示訂閱該主題下所有tag,
                   * 如果需要訂閱該主題下的指定tag,subExpression設置為對應tag名稱,多個tag以||分割,例如"tag1 || tag2 || tag3"
                   */
                  consumer.subscribe(topics, "*");
                  consumer.start();
      
                  LOGGER.info("Consumer Started : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr);
              } catch (Exception e) {
                  LOGGER.error("Consumer Start Failed : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr, e);
                  e.printStackTrace();
              }
              return consumer;
          }
      }

    • 新增消息監聽器,監聽到新消息後,執行對應的業務邏輯。

      @Component
      public class ConsumeMsgListener implements MessageListenerConcurrently {
          private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeMsgListener.class);
      
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
              if (CollectionUtils.isEmpty(msgs)) {
                  LOGGER.info("Msgs is Empty.");
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
              for (MessageExt msg : msgs) {
                  try {
                      if ("user-topic".equals(msg.getTopic())) {
                          LOGGER.info("{} Receive New Messages: {}", Thread.currentThread().getName(), new String(msg.getBody()));
                          // do something
                      }
                  } catch (Exception e) {
                      if (msg.getReconsumeTimes() == 3) {
                          // 超過3次不再重試
                          LOGGER.error("Msg Consume Failed.");
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      } else {
                          // 重試
                          return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                      }
                  }
              }
      
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      }

三、總結

  Demo很簡單,但是裏面還有很多東西需要慢慢研究。

  代碼可以戳這裏:spring-cloud-learn

RocketMQ入門案例