1. 程式人生 > >【RabbitMQ】4、三種Exchange模式——訂閱、路由、通配符模式

【RabbitMQ】4、三種Exchange模式——訂閱、路由、通配符模式

message final 支持 sim 使用 完全 自己的 print ued

前兩篇博客介紹了兩種隊列模式,這篇博客介紹訂閱、路由和通配符模式,之所以放在一起介紹,是因為這三種模式都是用了Exchange交換機,消息沒有直接發送到隊列,而是發送到了交換機,經過隊列綁定交換機到達隊列。


一、訂閱模式(Fanout Exchange):

一個生產者,多個消費者,每一個消費者都有自己的一個隊列,生產者沒有將消息直接發送到隊列,而是發送到了交換機,每個隊列綁定交換機,生產者發送的消 息經過交換機,到達隊列,實現一個消息被多個消費者獲取的目的。需要註意的是,如果將消息發送到一個沒有隊列綁定的exchange上面,那麽該消息將會 丟失,這是因為在rabbitMQ中exchange不具備存儲消息的能力,只有隊列具備存儲消息的能力。

技術分享圖片

技術分享圖片

示例代碼:

生產者:

public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//從連接中創建通道
Channel channel = connection.createChannel();
// 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "商品已經新增,id = 1000"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘"); channel.close(); connection.close(); } }

消費者1:

public
class Recv { private final static String QUEUE_NAME = "test_queue_fanout_1"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 前臺系統: ‘" + message + "‘"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }


消費者2的代碼和消費者1的代碼大致相同,只是隊列的名稱不一樣,這樣兩個消費者有自己的隊列,都可以接收到生產者發送的消息


但是如果生產者有新增商品,修改商品,刪除商品的消息,消費者包快前臺系統和搜索系統,要求前臺系統接收修改和刪除商品的消息,搜索系統接收新增商品、修改商品和刪除商品的消息。所以使用這種訂閱模式實現商品數據的同步並不合理。因此我們介紹下一種模式:路由模式。

二、路由模式(Direct Exchange)

這種模式添加了一個路由鍵,生產者發布消息的時候添加路由鍵,消費者綁定隊列到交換機時添加鍵值,這樣就可以接收到需要接收的消息。

技術分享圖片

技術分享圖片

示例代碼:

生產者:

public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息內容
String message = "刪除商品, id = 1001";
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
channel.close();
connection.close();
}
}

消費者1:接收更新和刪除消息

public class Recv {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前臺系統: ‘" + message + "‘");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

消費者2:接收insert,update,delete的消息

public class Recv2 {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 搜索系統: ‘" + message + "‘");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

如果生產者發布了insert消息,那麽消費者2可以收到,消費者 1收不到,如果發布了update或者delete消息,兩個消費者都可以收到。如果發布ABC消息兩個消費者都收不到,因為沒有綁定這個鍵值。這種模式 基本滿足了我們的需求,但是還不夠靈活,下面介紹另外一個模式。

三、通配符模式(Topic Exchange)

基本思想和路由模式是一樣的,只不過路由鍵支持模糊匹配,符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞

技術分享圖片

技術分享圖片

示例代碼:

生產者:

public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息內容
String message = "刪除商品,id = 1001";
channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
channel.close();
connection.close();
}
}


消費者1:

public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前臺系統: ‘" + message + "‘");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}



消費者2:

public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 搜索系統: ‘" + message + "‘");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}


消費者1是按需索取,並沒有使用通配符模式,而是用的完全匹配,消費者2使用通配符模式,這樣以item.開頭的消息都會全部接收。

小結:

1.與簡單模式和work模式對比,前面兩種同一個消息只能被一個消費者獲取,而今天的這三種模式,可以實現一個消息被多個消費者 獲取。

2.fanout這種模式沒有加入路由器,隊列與exchange綁定後,就會接收到所有的消息,其余兩種增加了路由鍵,並且第三種增加通配符,更加便利。

【RabbitMQ】4、三種Exchange模式——訂閱、路由、通配符模式