7_rabbitmq訂閱模式 PublishSubscribe
阿新 • • 發佈:2019-02-01
rabbitmq訂閱模式 PublishSubscribe
更多幹貨
例子程式碼地址
模型圖
我們之前學習的都是一個訊息只能被一個消費者消費,那麼如果我想發一個訊息 能被多個消費者消費,這時候怎麼辦? 這時候我們就得用到了訊息中的釋出訂閱模型
- 在前面的教程中,我們建立了一個工作佇列,都是一個任務只交給一個消費者。
- 這次我們做 將訊息傳送給多個消費者。這種模式叫做“釋出/訂閱”。
舉列:
類似微信訂閱號 釋出文章訊息 就可以廣播給所有的接收者。(訂閱者)
那麼咱們來看一下圖,我們學過前兩種有一些不一樣,work 模式 是不是同一個佇列 多個消費者,而 ps 這種模式呢,是一個佇列對應一個消費者,pb 模式還多了一個 X(交換機 轉發器) ,這時候我們要獲取訊息 就需要佇列繫結到交換機上,交換機把訊息傳送到佇列 , 消費者才能獲取佇列的訊息
解讀:
- 1、1 個生產者,多個消費者
- 2、每一個消費者都有自己的一個佇列
- 3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機(轉發器)
- 4、每個佇列都要繫結到交換機
- 5、生產者傳送的訊息,經過交換機,到達佇列,實現,一個訊息被多個消費者獲取的目的
註冊完 發簡訊 發郵件
生產者
後臺註冊 ->郵件->簡訊
public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception{ // 獲取到連線以及mq通道 Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 宣告exchange 交換機 轉發器 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂 // 訊息內容 String message = "Hello PB"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
那麼先看一下控制檯 是不是有這個交換機
但是這個傳送的訊息到哪了呢? 訊息丟失了!!!因為交換機沒有儲存訊息的能力,在 rabbitmq 中只有佇列儲存訊息的能力.因為這時還沒有佇列,所以就會丟失;
小結:訊息傳送到了一個沒有繫結佇列的交換機時,訊息就會丟失!
那麼我們再來寫消費者
消費者 1
郵件傳送系統
public class Recv { private final static String QUEUE_NAME = "test_queue_fanout_email"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 繫結佇列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //------------下面邏輯和work模式一樣----- // 同一時刻伺服器只會發一條訊息給消費者 channel.basicQos(1); // 定義一個消費者 Consumer consumer = new DefaultConsumer(channel) { // 訊息到達 觸發這個方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done "); // 手動回執 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消費者 2
類似簡訊傳送系統
public class Recv2 { private final static String QUEUE_NAME = "test_queue_fanout_2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 繫結佇列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻伺服器只會發一條訊息給消費者 // 定義一個消費者 Consumer consumer = new DefaultConsumer(channel) { // 訊息到達 觸發這個方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[2] Recv msg:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done "); // 手動回執 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }