1. 程式人生 > >詳解RocketMQ中的consumer

詳解RocketMQ中的consumer

上一篇部落格著重講解了一下RocketMQ中的Producer,那麼接下來這篇部落格來帶大家來了解一下RocketMQ中的Consumer角色

 

 上述就是MQ中有關Consumer的類圖,下面來介紹一下每個類

 1.MQAdmin:底層類,上篇部落格已經提過,就不再此重提

 2.MQConsumer:Consumer公共的介面,常用的方法如下

 如果消費失敗的話,訊息將會返回到broker中,並且延遲一會消費的時間

   void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

 3.MQPushConsumer:Consumer的一種,應用通常向Consumer物件註冊一個Listener介面,一旦收到訊息,Consumer物件立刻回撥Listener介面方法

4.MQPullConsumer:Consumer的一種,應用通常主動呼叫Consumer的拉訊息方法從Broker拉訊息,主動權由應用控制

 在上圖中出現了兩類的消費者分別是PushConsumer和PullConsumer,下面來看一下

PushConsumer:通過註冊監聽的方式來消費資訊

[java] view plain copy print?
  1. <span style="font-family:Comic Sans MS;font-size:18px;"
    >/**      
  2.  * @FileName: Consumer.java    
  3.  * @Package:com.test    
  4.  * @Description: TODO   
  5.  * @author: LUCKY     
  6.  * @date:2015年12月28日 下午2:43:23    
  7.  * @version V1.0      
  8.  */
  9. package com.test;  
  10. import java.util.List;  
  11. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  12. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  13. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  14. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  15. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  16. import com.alibaba.rocketmq.common.message.Message;  
  17. import com.alibaba.rocketmq.common.message.MessageExt;  
  18. /** 
  19.  * @ClassName: Consumer 
  20.  * @Description: 模擬消費者 
  21.  * @author: LUCKY 
  22.  * @date:2015年12月28日 下午2:43:23 
  23.  */
  24. publicclass ConsumerTest {  
  25.     publicstaticvoid main(String[] args) {  
  26.         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");  
  27.         consumer.setNamesrvAddr("100.66.154.81:9876");  
  28.         try {  
  29.             // 訂閱PushTopic下Tag為push的訊息,都訂閱訊息
  30.             consumer.subscribe("PushTopic""push");  
  31.             // 程式第一次啟動從訊息佇列頭獲取資料
  32.             consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  33.             //可以修改每次消費訊息的數量,預設設定是每次消費一條
  34.             // consumer.setConsumeMessageBatchMaxSize(10);
  35.             //註冊消費的監聽
  36.             consumer.registerMessageListener(new MessageListenerConcurrently() {  
  37.                //在此監聽中消費資訊,並返回消費的狀態資訊
  38.                 public ConsumeConcurrentlyStatus consumeMessage(  
  39.                         List<MessageExt> msgs,  
  40.                         ConsumeConcurrentlyContext context) {  
  41.                     // msgs中只收集同一個topic,同一個tag,並且key相同的message
  42.                     // 會把不同的訊息分別放置到不同的佇列中
  43.                     for(Message msg:msgs){  
  44.                         System.out.println(new String(msg.getBody()));  
  45.                     }     
  46.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  47.                 }  
  48.             });  
  49.             consumer.start();  
  50.             Thread.sleep(5000);  
  51.             //5秒後掛載消費端消費
  52.             consumer.suspend();  
  53.         } catch (Exception e) {  
  54.             e.printStackTrace();  
  55.         }  
  56.     }  
  57. }  
  58. </span>  

 PullConsumer:通過拉去的方式來消費訊息

[java] view plain copy print?
  1. <span style="font-family:Comic Sans MS;font-size:18px;">/**      
  2.  * @FileName: Consumer.java    
  3.  * @Package:com.test    
  4.  * @Description: TODO   
  5.  * @author: LUCKY     
  6.  * @date:2015年12月28日 下午2:43:23    
  7.  * @version V1.0      
  8.  */
  9. package com.test;  
  10. import java.util.Set;  
  11. import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;  
  12. import com.alibaba.rocketmq.client.consumer.MessageQueueListener;  
  13. import com.alibaba.rocketmq.common.message.MessageQueue;  
  14. /** 
  15.  * @ClassName: Consumer 
  16.  * @Description: 模擬消費者 
  17.  * @author: LUCKY 
  18.  * @date:2015年12月28日 下午2:43:23 
  19.  */
  20. publicclass ConsumerPullTest {  
  21.     publicstaticvoid main(String[] args) {  
  22.         DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();  
  23.         consumer.setNamesrvAddr("100.66.154.81:9876");  
  24.        consumer.setConsumerGroup("broker");  
  25.         try {  
  26.             consumer.start();  
  27.         Set<MessageQueue> messageQueues=  consumer.fetchSubscribeMessageQueues("PushTopic");        
  28.         for(MessageQueue messageQueue:messageQueues){  
  29.             System.out.println(messageQueue.getTopic());  
  30.         }  
  31.         //訊息佇列的監聽
  32.         consumer.registerMessageQueueListener(""new MessageQueueListener() {  
  33.             @Override
  34.             //訊息佇列有改變,就會觸發
  35.             publicvoid messageQueueChanged(String topic, Set<MessageQueue> mqAll,  
  36.                     Set<MessageQueue> mqDivided) {  
  37.                 // TODO Auto-generated method stub
  38.             }  
  39.         });  
  40.         } catch (Exception e) {  
  41.             e.printStackTrace();  
  42.         }  
  43.     }  
  44. }  
  45. </span>  

一般在應用中都會採用push的方法來自動的消費資訊