rabbitmq--釋出訂閱模式
rabbitmq-----釋出訂閱模式
模型組成
一個消費者Producer,一個交換機Exchange,多個訊息佇列Queue,多個消費者Consumer
一個生產者,多個消費者,每一個消費者都有自己的一個佇列,生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機,每個佇列繫結交換機,生產者傳送的訊息經過交換機,到達佇列,實現一個訊息被多個消費者獲取的目的。需要注意的是,如果將訊息傳送到一個沒有佇列繫結的exchange上面,那麼該訊息將會丟失,這是因為在rabbitMQ中exchange不具備儲存訊息的能力,只有佇列具備儲存訊息的能力。
Exchange
相比較於前兩種模型Hello World和Work,這裡多一個一個Exchange。其實Exchange是RabbitMQ的標配組成部件之一,前兩種沒有提到Exchange是為了簡化模型,即使模型中沒有看到Exchange的宣告,其實還是聲明瞭一個預設的Exchange。
RabbitMQ中實際傳送訊息並不是直接將訊息傳送給訊息佇列,訊息佇列也沒那麼聰明知道這條訊息從哪來要到哪去。RabbitMQ會先將訊息傳送個Exchange,Exchange會根據這條訊息打上的標記知道該條訊息從哪來到哪去。
Exchange憑什麼知道訊息的何去何從,因為Exchange有幾種型別:direct,fanout,topic和headers。這裡說的訂閱者模式就可以認為是fanout模式了。
RabbitMQ
中,所有生產者提交的訊息都由Exchange
來接受,然後Exchange
按照特定的策略轉發到Queue
進行儲存 RabbitMQ
提供了四種Exchange
:fanout,direct,topic,header .釋出/訂閱模式就是是基於
fanout Exchange
實現的。
fanout
這種模式不需要指定佇列名稱,需要將Exchange
和queue
繫結,他們之間的關係是‘多對多’的關係
任何傳送到fanout Exchange
的訊息都會被轉發到與該Exchange
繫結的queue
上面。
訂閱者模式有何不同
訂閱者模式相對前面的Work模式有和不同?Work也有多個消費者,但是隻有一個訊息佇列,並且一個訊息只會被某一個消費者消費。但是訂閱者模式不一樣,它有多個訊息佇列,也有多個消費者,而且一條訊息可以被多個消費者消費,類似廣播模式。下面通過例項程式碼看看這種模式是如何收發訊息的。
1 package com.maozw.mq.pubsub; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.rabbit.connection.Connection; 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.web.bind.annotation.PathVariable; 11 import org.springframework.web.bind.annotation.RequestMapping; 12 import org.springframework.web.bind.annotation.RestController; 13 14 import java.io.IOException; 15 import java.util.concurrent.TimeoutException; 16 17 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK; 18 19 /** 20 * work 模式 21 * 兩種分發: 輪詢分發 + 公平分發 22 * 輪詢分發:消費端:自動確認訊息;boolean autoAck = true; 23 * 公平分發: 消費端:手動確認訊息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false); 24 * 25 * @author MAOZW 26 * @Description: ${todo} 27 * @date 2018/11/26 15:06 28 */ 29 @RestController 30 @RequestMapping("/publish") 31 public class PublishProducer { 32 private static final Logger LOGGER = LoggerFactory.getLogger(PublishProducer.class); 33 @Autowired 34 RabbitConfig rabbitConfig; 35 36 37 @RequestMapping("/send/{exchangeName}/{queueName}") 38 public String send(@PathVariable String exchangeName, @PathVariable String queueName) throws IOException, TimeoutException { 39 Connection connection = null; 40 Channel channel= null; 41 try { 42 ConnectionFactory connectionFactory = rabbitConfig.connectionFactory(); 43 connection = connectionFactory.createConnection(); 44 channel = connection.createChannel(false); 45 46 /** 47 * 申明交換機 48 */ 49 channel.exchangeDeclare(exchangeName,"fanout"); 50 51 /** 52 * 傳送訊息 53 * 每個消費者 傳送確認訊息之前,訊息佇列不會發送下一個訊息給消費者,一次只處理一個訊息 54 * 自動模式無需設定下面設定 55 */ 56 int prefetchCount = 1; 57 channel.basicQos(prefetchCount); 58 59 String Hello = ">>>> Hello Simple <<<<"; 60 for (int i = 0; i < 5; i++) { 61 String message = Hello + i; 62 channel.basicPublish(RabbitConfig.EXCHANGE_AAAAA, "", null, message.getBytes()); 63 LOGGER.info("生產訊息: " + message); 64 } 65 return "OK"; 66 }catch (Exception e) { 67 68 } finally { 69 connection.close(); 70 channel.close(); 71 return OK; 72 } 73 } 74 }
訂閱1
1 package com.maozw.mq.pubsub; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.rabbit.connection.Connection; 11 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 12 13 import java.io.IOException; 14 15 /** 16 * @author MAOZW 17 * @Description: ${todo} 18 * @date 2018/11/26 15:06 19 */ 20 21 public class SubscribeConsumer { 22 private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer.class); 23 24 public static void main(String[] args) throws IOException { 25 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 26 Connection connection = connectionFactory.createConnection(); 27 Channel channel = connection.createChannel(false); 28 /** 29 * 建立佇列申明 30 */ 31 boolean durable = true; 32 channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT, durable, false, false, null); 33 /** 34 * 繫結佇列到交換機 35 */ 36 channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT, RabbitConfig.EXCHANGE_AAAAA,""); 37 38 /** 39 * 改變分發規則 40 */ 41 channel.basicQos(1); 42 DefaultConsumer consumer = new DefaultConsumer(channel) { 43 @Override 44 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 45 super.handleDelivery(consumerTag, envelope, properties, body); 46 System.out.println("[2] 介面資料 : " + new String(body, "utf-8")); 47 try { 48 Thread.sleep(300); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } finally { 52 System.out.println("[2] done!"); 53 //訊息應答:手動回執,手動確認訊息 54 channel.basicAck(envelope.getDeliveryTag(),false); 55 } 56 } 57 }; 58 //監聽佇列 59 /** 60 * autoAck 訊息應答 61 * 預設輪詢分發開啟:true :這種模式一旦rabbitmq將訊息傳送給消費者,就會從記憶體中刪除該訊息,不關心客戶端是否消費正常。 62 * 使用公平分發需要關閉autoAck:false 需要手動傳送回執 63 */ 64 boolean autoAck = false; 65 channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT,autoAck, consumer); 66 } 67 68 }
1 package com.maozw.mq.pubsub; 2 3 import com.maozw.mq.config.RabbitConfig; 4 import com.rabbitmq.client.AMQP; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import org.slf4j.Logger; 9 import org.slf4j.LoggerFactory; 10 import org.springframework.amqp.rabbit.connection.Connection; 11 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 12 13 import java.io.IOException; 14 15 /** 16 * @author MAOZW 17 * @Description: ${todo} 18 * @date 2018/11/26 15:06 19 */ 20 21 public class SubscribeConsumer2 { 22 private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeConsumer2.class); 23 24 public static void main(String[] args) throws IOException { 25 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 26 Connection connection = connectionFactory.createConnection(); 27 Channel channel = connection.createChannel(false); 28 /** 29 * 建立佇列申明 30 */ 31 boolean durable = true; 32 channel.queueDeclare(RabbitConfig.QUEUE_PUBSUB_FANOUT2, durable, false, false, null); 33 /** 34 * 繫結佇列到交換機 35 */ 36 channel.queueBind(RabbitConfig.QUEUE_PUBSUB_FANOUT2, RabbitConfig.EXCHANGE_AAAAA,""); 37 38 /** 39 * 改變分發規則 40 */ 41 channel.basicQos(1); 42 DefaultConsumer consumer = new DefaultConsumer(channel) { 43 @Override 44 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 45 super.handleDelivery(consumerTag, envelope, properties, body); 46 System.out.println("[2] 介面資料 : " + new String(body, "utf-8")); 47 try { 48 Thread.sleep(400); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } finally { 52 System.out.println("[2] done!"); 53 //訊息應答:手動回執,手動確認訊息 54 channel.basicAck(envelope.getDeliveryTag(),false); 55 } 56 } 57 }; 58 //監聽佇列 59 /** 60 * autoAck 訊息應答 61 * 預設輪詢分發開啟:true :這種模式一旦rabbitmq將訊息傳送給消費者,就會從記憶體中刪除該訊息,不關心客戶端是否消費正常。 62 * 使用公平分發需要關閉autoAck:false 需要手動傳送回執 63 */ 64 boolean autoAck = false; 65 channel.basicConsume(RabbitConfig.QUEUE_PUBSUB_FANOUT2,autoAck, consumer); 66 } 67 }