1. 程式人生 > >7_rabbitmq訂閱模式 PublishSubscribe

7_rabbitmq訂閱模式 PublishSubscribe

rabbitmq訂閱模式 PublishSubscribe

更多幹貨

例子程式碼地址

模型圖

我們之前學習的都是一個訊息只能被一個消費者消費,那麼如果我想發一個訊息 能被多個消費者消費,這時候怎麼辦? 這時候我們就得用到了訊息中的釋出訂閱模型

image

  • 在前面的教程中,我們建立了一個工作佇列,都是一個任務只交給一個消費者。
  • 這次我們做 將訊息傳送給多個消費者。這種模式叫做“釋出/訂閱”。

舉列:

類似微信訂閱號 釋出文章訊息 就可以廣播給所有的接收者。(訂閱者)

那麼咱們來看一下圖,我們學過前兩種有一些不一樣,work 模式 是不是同一個佇列 多個消費者,而 ps 這種模式呢,是一個佇列對應一個消費者,pb 模式還多了一個 X(交換機 轉發器) ,這時候我們要獲取訊息 就需要佇列繫結到交換機上,交換機把訊息傳送到佇列 , 消費者才能獲取佇列的訊息

解讀:

  • 1、1 個生產者,多個消費者
  • 2、每一個消費者都有自己的一個佇列
  • 3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機(轉發器)
  • 4、每個佇列都要繫結到交換機
  • 5、生產者傳送的訊息,經過交換機,到達佇列,實現,一個訊息被多個消費者獲取的目的

註冊完 發簡訊 發郵件

生產者

後臺註冊 ->郵件->簡訊

public class Send {
	 private final static String EXCHANGE_NAME = "test_exchange_fanout";
	 public static void main(String[] argv) throws Exception
{ // 獲取到連線以及mq通道 Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 宣告exchange 交換機 轉發器 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂 // 訊息內容 String message = "Hello PB"; channel.basicPublish(EXCHANGE_NAME, "", null
, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }

那麼先看一下控制檯 是不是有這個交換機

image

但是這個傳送的訊息到哪了呢? 訊息丟失了!!!因為交換機沒有儲存訊息的能力,在 rabbitmq 中只有佇列儲存訊息的能力.因為這時還沒有佇列,所以就會丟失;

小結:訊息傳送到了一個沒有繫結佇列的交換機時,訊息就會丟失!

那麼我們再來寫消費者

消費者 1

郵件傳送系統

public class Recv {
	private final static String QUEUE_NAME = "test_queue_fanout_email";
	private final static String EXCHANGE_NAME = "test_exchange_fanout";
	public static void main(String[] argv) throws Exception {
		// 獲取到連線以及mq通道
		Connection connection = ConnectionUtils.getConnection();
		final Channel channel = connection.createChannel();
		// 宣告佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 繫結佇列到交換機
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		//------------下面邏輯和work模式一樣-----
		// 同一時刻伺服器只會發一條訊息給消費者
		channel.basicQos(1);
		// 定義一個消費者
		Consumer consumer = new DefaultConsumer(channel) {
			// 訊息到達 觸發這個方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
			BasicProperties properties, byte[] body) throws IOException {
				String msg = new String(body, "utf-8");
				System.out.println("[1] Recv msg:" + msg);
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					System.out.println("[1] done ");
					// 手動回執
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		boolean autoAck = false;
		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
	}
}

消費者 2

類似簡訊傳送系統

public class Recv2 {

	private final static String QUEUE_NAME = "test_queue_fanout_2";
	private final static String EXCHANGE_NAME = "test_exchange_fanout";
	
	public static void main(String[] argv) throws Exception {
		// 獲取到連線以及mq通道
		Connection connection = ConnectionUtils.getConnection();
		final Channel channel = connection.createChannel();
		// 宣告佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 繫結佇列到交換機
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		// 同一時刻伺服器只會發一條訊息給消費者
		// 定義一個消費者
		Consumer consumer = new DefaultConsumer(channel) {
			// 訊息到達 觸發這個方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
			BasicProperties properties, byte[] body) throws IOException {
				String msg = new String(body, "utf-8");
				System.out.println("[2] Recv msg:" + msg);
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					System.out.println("[2] done ");
					// 手動回執
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		boolean autoAck = false;
		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
	}
}

測試

一個訊息 可以被多個消費者獲取 image