1. 程式人生 > >rocketmq實戰入門

rocketmq實戰入門

1.pom檔案

<dependencies>
	<dependency>
	<groupId>com.alibaba.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>3.2.6</version>
  </dependency>

  </dependencies>


生產者 :

package com.sun.rocketmq.qs;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;


/**
 * 
 * @ClassName: rocket生產者 
 * @Description: TODO
 * @author: sunqz
 * @date: 2017-6-3 下午8:11:50
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("hwp"); //引數為組名
        producer.setNamesrvAddr("192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876");//指定nameserver地址
        
        producer.setRetryTimesWhenSendFailed(5); //訊息傳送失敗重試次數
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic 
                    "TagA",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg,1000);//該訊息1秒沒傳送成功則重試
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
消費者0:
package com.sun.rocketmq.qs;

import java.io.UnsupportedEncodingException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;


/**
 * Consumer,訂閱訊息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
    	/**
    	 * PushConsumer  Consumer 的一種,應用通常吐 Consumer 物件註冊一個 Listener 介面,一旦收到訊息,Consumer 物件立
         *   刻回撥 Listener 介面方法。	
    	 */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hw");
        consumer.setNamesrvAddr("192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876");
        /**
         *  ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET一個新的訂閱組第一次啟動從佇列的最前位置開始消費,後續再啟動接著上次消費的進度開始消費
         *  ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一個新的訂閱組第一次啟動從佇列的最後位置開始消費,後續再啟動接著上次消費的進度開始消費
         *  ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST  @Deprecated
         *  ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET   @Deprecated
         *  ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET  @Deprecated
         *  ConsumeFromWhere.CONSUME_FROM_TIMESTAMP    一個新的訂閱組第一次啟動從指定時間點開始消費,後續再啟動接著上次消費的進度開始消費,
         *                                            時間點設定參見DefaultMQPushConsumer.consumeTimestamp引數
         *  
         *  
         *  MessageModel.CLUSTERING  叢集消費,一個 Consumer Group 中的 Consumer 例項平均分攤消費訊息。例如某個 Topic 有 9 條訊息,其中一個
			                         Consumer Group 有 3 個例項(可能是 3 個迕程,戒者 3 臺機器),那舉每個例項只消費其中的 3 條訊息
         *  MessageModel.BROADCASTING  廣播
         *  
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        /**
         * 主題Tpoic:第一級訊息型別,書的標題
         * 標籤Tags:第二級訊息型別,書的目錄,可以基於Tag做訊息過濾
         */
        consumer.subscribe("TopicTest", //指定消費主題是topicTest
        		"TagA||TagB");   //tag為TagA tagB的訊息
        
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
        	
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
            	/**
            	 * List<MessageExt> msgs  這裡雖然是list 但實際上基本都只會是一條,除非訊息堆積,
            	 * 且記住一定是先啟動消費者,再啟動生產者
            	 * 否則極有可能導致訊息的重複消費
            	 * 
            	 */
            	for(MessageExt mext : msgs) {
            		try {
						System.out.println("消費了一條訊息:"+new String(mext.getBody(),"utf-8"));	
					} catch (UnsupportedEncodingException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
				//消費失敗告訴mq重新發送繼續消費  如果多次消費仍不成功可以記錄在資料庫中,可以通過mext.getReconsumeTimes()獲取消費次數
						return ConsumeConcurrentlyStatus.RECONSUME_LATER;
					} 
            	}
            	 /*
            	  * 告訴mq消費成功 
            	  */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                
               
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

消費者1:
package com.sun.rocketmq.qs;

import java.io.UnsupportedEncodingException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;


/**
 * Consumer,訂閱訊息
 */
public class Consumer1 {

    public static void main(String[] args) throws InterruptedException, MQClientException {
    	/**
    	 * PushConsumer  Consumer 的一種,應用通常吐 Consumer 物件註冊一個 Listener 介面,一旦收到訊息,Consumer 物件立
         *   刻回撥 Listener 介面方法。	
    	 */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hw");
        consumer.setNamesrvAddr("192.168.1.244:9876;192.168.1.245:9876;192.168.1.242:9876;192.168.1.243:9876");
        /**
         *  ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET一個新的訂閱組第一次啟動從佇列的最前位置開始消費,後續再啟動接著上次消費的進度開始消費
         *  ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一個新的訂閱組第一次啟動從佇列的最後位置開始消費,後續再啟動接著上次消費的進度開始消費
         *  ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST  @Deprecated
         *  ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET   @Deprecated
         *  ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET  @Deprecated
         *  ConsumeFromWhere.CONSUME_FROM_TIMESTAMP    一個新的訂閱組第一次啟動從指定時間點開始消費,後續再啟動接著上次消費的進度開始消費,
         *                                            時間點設定參見DefaultMQPushConsumer.consumeTimestamp引數
         *  
         *  
         *  MessageModel.CLUSTERING  叢集消費,一個 Consumer Group 中的 Consumer 例項平均分攤消費訊息。例如某個 Topic 有 9 條訊息,其中一個
			                         Consumer Group 有 3 個例項(可能是 3 個迕程,戒者 3 臺機器),那舉每個例項只消費其中的 3 條訊息
         *  MessageModel.BROADCASTING  廣播  被每個消費組消費
         *  
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        /**
         * 主題Tpoic:第一級訊息型別,書的標題
         * 標籤Tags:第二級訊息型別,書的目錄,可以基於Tag做訊息過濾
         */
        consumer.subscribe("TopicTest", //指定消費主題是topicTest
        		"TagA||TagB");   //tag為TagA tagB的訊息

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
            	/**
            	 * List<MessageExt> msgs  這裡雖然是list 但實際上基本都只會是一條,除非訊息堆積,
            	 * 且記住一定是先啟動消費者,再啟動生產者
            	 * 否則極有可能導致訊息的重複消費
            	 * 
            	 */
            	for(MessageExt mext : msgs) {
            		try {
						System.out.println("消費了一條訊息:"+new String(mext.getBody(),"utf-8"));	
					} catch (UnsupportedEncodingException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
		//消費失敗告訴mq重新發送繼續消費  如果多次消費仍不成功可以記錄在資料庫中,可以通過mext.getReconsumeTimes()獲取消費次數
						return ConsumeConcurrentlyStatus.RECONSUME_LATER;
					} 
            	}
            	 /*
            	  * 告訴mq消費成功 
            	  */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

先啟動兩個消費者,然後啟動生產者,生產者將產生10條訊息,下圖是列印資訊:

通過列印資訊我們可以看到有的訊息被髮送到了broker-a ,有的訊息被髮送到了broker-b 實現了自動負載均衡

在叢集消費模式下,訊息會被消費組裡的負載均衡消費

customer0 ,列印訊息:


customer1,列印: