1. 程式人生 > >MQ的訂閱模式

MQ的訂閱模式

def mage 生產者 blog email ued don 場景 pre

一:介紹

1.模式

  技術分享圖片

2.使用場景

  一個生產者,多個消費者

  每一個消費者都有自己的隊列

  生產者沒有直接把消息發送給隊列,而是發送到了交換機

  每一個隊列都要綁定到交換機

  可以實現一個消息被多個消費者消費。

二:程序

1.生產者

 1 package com.mq.PubSubFanout;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 public class
FanoutSend { 8 private static final String EXCHANGE_NAME="text_exchange_fanout"; 9 public static void main(String[] args) throws Exception { 10 //獲取一個連接 11 Connection connection= ConnectionUtil.getConnection(); 12 //從連接中獲取一個通道 13 Channel channel=connection.createChannel();
14 //創建交換機 15 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); 16 17 //消息 18 String msg="hello pubsub"; 19 20 //發送 21 channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); 22 23 System.out.println("send msg:"+msg); 24 //關閉連接 25 channel.close();
26 connection.close(); 27 } 28 }

2.消費者一

 1 package com.mq.PubSubFanout;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class FanoutReceive1 {
 9     private static final String EXCHANGE_NAME="text_exchange_fanout";
10     private static final String QUENE_NAME="test_fanout_queue_email";
11     public static void main(String[] args)throws Exception{
12         //獲取一個連接
13         Connection connection = ConnectionUtil.getConnection();
14         //創建通道
15         final Channel channel = connection.createChannel();
16         //創建隊列聲明
17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
18 
19         //綁定交換機
20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
21 
22         //一次只能發送一個消息
23         channel.basicQos(1);
24 
25         //創建消費者
26         DefaultConsumer consumer=new DefaultConsumer(channel){
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 String msg=new String(body,"utf-8");
30                 System.out.println("[1]receive msg:"+msg);
31                 try {
32                     Thread.sleep(200);
33                 } catch (InterruptedException e) {
34                     e.printStackTrace();
35                 }finally {
36                     System.out.println("done");
37                     //手動應答
38                     channel.basicAck(envelope.getDeliveryTag(),false);
39                 }
40             }
41         };
42         //監聽隊列,不是自動應答
43         boolean autoAck=false;
44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
45     }
46 }

3.消費者二

 1 package com.mq.PubSubFanout;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class FanoutReceive2 {
 9     private static final String EXCHANGE_NAME="text_exchange_fanout";
10     private static final String QUENE_NAME="test_fanout_queue_ems";
11     public static void main(String[] args)throws Exception{
12         //獲取一個連接
13         Connection connection = ConnectionUtil.getConnection();
14         //創建通道
15         final Channel channel = connection.createChannel();
16         //創建隊列聲明
17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
18 
19         //綁定交換機
20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
21 
22         //一次只能發送一個消息
23         channel.basicQos(1);
24 
25         //創建消費者
26         DefaultConsumer consumer=new DefaultConsumer(channel){
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 String msg=new String(body,"utf-8");
30                 System.out.println("[2]receive msg:"+msg);
31                 try {
32                     Thread.sleep(200);
33                 } catch (InterruptedException e) {
34                     e.printStackTrace();
35                 }finally {
36                     System.out.println("done");
37                     //手動應答
38                     channel.basicAck(envelope.getDeliveryTag(),false);
39                 }
40             }
41         };
42         //監聽隊列,不是自動應答
43         boolean autoAck=false;
44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
45     }
46 }

4.效果

  send:

  技術分享圖片

  receive1:

  技術分享圖片

  receive2:

  技術分享圖片

5.運行註意點

  如果之間運行receive類,會發現報錯,因為沒有交換機。

  所以,可以先運行send類,雖然交換機不能存儲發送的消息,但是可以創建交換機。

  然後,就可以按照原來的方式。

  先啟動兩個消費者進行監聽,然後啟動生產者。

  現象:就是消費者都獲取到了生產者生產的消息。

MQ的訂閱模式