1. 程式人生 > >rabbitmq--釋出訂閱模式

rabbitmq--釋出訂閱模式

rabbitmq-----釋出訂閱模式

 模型組成

一個消費者Producer,一個交換機Exchange,多個訊息佇列Queue,多個消費者Consumer

一個生產者,多個消費者,每一個消費者都有自己的一個佇列,生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機,每個佇列繫結交換機,生產者傳送的訊息經過交換機,到達佇列,實現一個訊息被多個消費者獲取的目的。需要注意的是,如果將訊息傳送到一個沒有佇列繫結的exchange上面,那麼該訊息將會丟失,這是因為在rabbitMQ中exchange不具備儲存訊息的能力,只有佇列具備儲存訊息的能力。

 

Exchange

相比較於前兩種模型Hello World和Work,這裡多一個一個Exchange。其實Exchange是RabbitMQ的標配組成部件之一,前兩種沒有提到Exchange是為了簡化模型,即使模型中沒有看到Exchange的宣告,其實還是聲明瞭一個預設的Exchange。

RabbitMQ中實際傳送訊息並不是直接將訊息傳送給訊息佇列,訊息佇列也沒那麼聰明知道這條訊息從哪來要到哪去。RabbitMQ會先將訊息傳送個Exchange,Exchange會根據這條訊息打上的標記知道該條訊息從哪來到哪去。

Exchange憑什麼知道訊息的何去何從,因為Exchange有幾種型別:direct,fanout,topic和headers。這裡說的訂閱者模式就可以認為是fanout模式了。

RabbitMQ中,所有生產者提交的訊息都由Exchange來接受,然後Exchange按照特定的策略轉發到Queue進行儲存 
RabbitMQ提供了四種Exchangefanout,direct,topic,header .釋出/訂閱模式就是是基於fanout Exchange實現的。

    • fanout這種模式不需要指定佇列名稱,需要將Exchangequeue繫結,他們之間的關係是‘多對多’的關係 
      任何傳送到fanout Exchange的訊息都會被轉發到與該Exchange繫結的queue上面。

訂閱者模式有何不同


訂閱者模式相對前面的Work模式有和不同?Work也有多個消費者,但是隻有一個訊息佇列,並且一個訊息只會被某一個消費者消費。但是訂閱者模式不一樣,它有多個訊息佇列,也有多個消費者,而且一條訊息可以被多個消費者消費,類似廣播模式。下面通過例項程式碼看看這種模式是如何收發訊息的。

 1 package com.maozw.mq.pubsub;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.Channel;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.rabbit.connection.Connection;
 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.web.bind.annotation.PathVariable;
11 import org.springframework.web.bind.annotation.RequestMapping;
12 import org.springframework.web.bind.annotation.RestController;
13 
14 import java.io.IOException;
15 import java.util.concurrent.TimeoutException;
16 
17 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK;
18 
19 /**
20  * work 模式
21  * 兩種分發: 輪詢分發 + 公平分發
22  * 輪詢分發:消費端:自動確認訊息;boolean autoAck = true;
23  * 公平分發: 消費端:手動確認訊息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false);
24  *
25  * @author MAOZW
26  * @Description: ${todo}
27  * @date 2018/11/26 15:06
28  */
29 @RestController
30 @RequestMapping("/publish")
31 public class PublishProducer {
32     private static final Logger LOGGER = LoggerFactory.getLogger(PublishProducer.class);
33     @Autowired
34     RabbitConfig rabbitConfig;
35 
36 
37     @RequestMapping("/send/{exchangeName}/{queueName}")
38     public String send(@PathVariable String exchangeName, @PathVariable String queueName) throws IOException, TimeoutException {
39         Connection connection = null;
40         Channel channel= null;
41         try {
42             ConnectionFactory connectionFactory = rabbitConfig.connectionFactory();
43             connection = connectionFactory.createConnection();
44             channel = connection.createChannel(false);
45 
46             /**
47              * 申明交換機
48              */
49             channel.exchangeDeclare(exchangeName,"fanout");
50 
51             /**
52              * 傳送訊息
53              * 每個消費者 傳送確認訊息之前,訊息佇列不會發送下一個訊息給消費者,一次只處理一個訊息
54              * 自動模式無需設定下面設定
55              */
56             int prefetchCount = 1;
57             channel.basicQos(prefetchCount);
58 
59             String Hello = ">>>> Hello Simple <<<<";
60             for (int i = 0; i < 5; i++) {
61                 String message = Hello + i;
62                 channel.basicPublish(RabbitConfig.EXCHANGE_AAAAA, "", null, message.getBytes());
63                 LOGGER.info("生產訊息: " + message);
64             }
65             return "OK";
66         }catch (Exception e) {
67 
68         } finally {
69             connection.close();
70             channel.close();
71             return OK;
72         }
73     }
74 }

 

 訂閱1

 1 package com.maozw.mq.pubsub;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class SubscribeConsumer {
22     private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer.class);
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26         Connection connection = connectionFactory.createConnection();
27         Channel channel = connection.createChannel(false);
28         /**
29          * 建立佇列申明
30          */
31         boolean durable = true;
32         channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT, durable, false, false, null);
33         /**
34          * 繫結佇列到交換機
35          */
36         channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT, RabbitConfig.EXCHANGE_AAAAA,"");
37 
38         /**
39          * 改變分發規則
40          */
41         channel.basicQos(1);
42         DefaultConsumer consumer = new DefaultConsumer(channel) {
43             @Override
44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45                 super.handleDelivery(consumerTag, envelope, properties, body);
46                 System.out.println("[2] 介面資料 : " + new String(body, "utf-8"));
47                 try {
48                     Thread.sleep(300);
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 } finally {
52                     System.out.println("[2] done!");
53                     //訊息應答:手動回執,手動確認訊息
54                     channel.basicAck(envelope.getDeliveryTag(),false);
55                 }
56             }
57         };
58         //監聽佇列
59         /**
60          * autoAck 訊息應答
61          *  預設輪詢分發開啟:true :這種模式一旦rabbitmq將訊息傳送給消費者,就會從記憶體中刪除該訊息,不關心客戶端是否消費正常。
62          *  使用公平分發需要關閉autoAck:false  需要手動傳送回執
63          */
64         boolean autoAck = false;
65         channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT,autoAck, consumer);
66     }
67     
68 }
 1 package com.maozw.mq.pubsub;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class SubscribeConsumer2 {
22     private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer2.class);
23 
24     public static void main(String[] args) throws IOException {
25         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
26         Connection connection = connectionFactory.createConnection();
27         Channel channel = connection.createChannel(false);
28         /**
29          * 建立佇列申明
30          */
31         boolean durable = true;
32         channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT2, durable, false, false, null);
33         /**
34          * 繫結佇列到交換機
35          */
36         channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT2, RabbitConfig.EXCHANGE_AAAAA,"");
37 
38         /**
39          * 改變分發規則
40          */
41         channel.basicQos(1);
42         DefaultConsumer consumer = new DefaultConsumer(channel) {
43             @Override
44             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
45                 super.handleDelivery(consumerTag, envelope, properties, body);
46                 System.out.println("[2] 介面資料 : " + new String(body, "utf-8"));
47                 try {
48                     Thread.sleep(400);
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 } finally {
52                     System.out.println("[2] done!");
53                     //訊息應答:手動回執,手動確認訊息
54                     channel.basicAck(envelope.getDeliveryTag(),false);
55                 }
56             }
57         };
58         //監聽佇列
59         /**
60          * autoAck 訊息應答
61          *  預設輪詢分發開啟:true :這種模式一旦rabbitmq將訊息傳送給消費者,就會從記憶體中刪除該訊息,不關心客戶端是否消費正常。
62          *  使用公平分發需要關閉autoAck:false  需要手動傳送回執
63          */
64         boolean autoAck = false;
65         channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT2,autoAck, consumer);
66     }
67 }