1. 程式人生 > >RabbitMQ交換器Exchange介紹與實踐

RabbitMQ交換器Exchange介紹與實踐

屬性 上傳 rri dem index 斷開 shutdown 不能 type

本章我們重點學習一下Rabbit裏面的exchange(交換器)的知識。 交換器分類 RabbitMQ的Exchange(交換器)分為四類:

direct(默認)
headers
fanout
topic
其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以我們本文也不做講解。

註意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途創建的隊列,獲取不到之前的消息。

1、direct交換器

direct為默認的交換器類型,也非常的簡單,如果路由鍵匹配的話,消息就投遞到相應的隊列,如圖:

技術分享圖片

圖片描述(最多50字)

使用代碼:channel.basicPublish("", QueueName, null, message)推送direct交換器消息到對於的隊列,空字符為默認的direct交換器,用隊列名稱當做路由鍵。

direct交換器代碼示例

發送端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接時是否刪除隊列;參數五:消息其他參數】

channel.queueDeclare(config.QueueName, false, false, false, null);
String message = String.format("當前時間:%s", new Date().getTime());
// 推送內容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其他屬性-路由的headers信息;參數四:消息主體】
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
接收端,持續接收消息:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨占模式;參數四:消費者斷開連接時是否刪除隊列;參數五:消息其他參數】
channel.queueDeclare(config.QueueName, false, false, false, null);
Consumer defaultConsumer = new DefaultConsumer(channel) {@Override
br/>@Override
byte[] body) throws IOException {
String message = new String(body, "utf-8"); // 消息正文
System.out.println(workName + "收到消息 => " + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量確認小於當前id的消息】
}
};
channel.basicConsume(config.QueueName, false, "", defaultConsumer);
接收端,獲取單條消息

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息確認
持續消息獲取使用:basic.consume;單個消息獲取使用:basic.get。

註意:不能使用for循環單個消息消費來替代持續消息消費,因為這樣性能很低;

消息的發後既忘特性

發後既往只的是接受者不知道消息的來源是誰發送的,如果想要指定消息的發送者,需要包含在發送內容裏面,這點就像我們在信件裏面註明自己的姓名一樣,只有這樣才能知道發送者是誰。

消息確認

看了上面的代碼我們可以知道,消息接收到之後必須使用channel.basicAck()方法手動確認(非自動確認刪除模式下),那麽問題來了。

消息收到未確認會怎麽樣?

如果應用程序接收了消息,因為bug忘記確認接收的話,消息在隊列的狀態會從“Ready”變為“Unacked”,如圖:

技術分享圖片

圖片描述(最多50字)

如果消息收到卻未確認,Rabbit將不會再給這個應用程序發送更多的消息了,這是因為Rabbit認為你沒有準備好接收下一條消息。

此條消息會一直保持Unacked的狀態,直到你確認了消息,或者斷開與Rabbit的連接,Rabbit會自動把消息改完Ready狀態,分發給其他訂閱者。

當然你可以利用這一點,讓你的程序延遲確認該消息,直到你的程序處理完相應的業務邏輯,這樣可以有效的防治Rabbit給你過多的消息,導致程序崩潰。

消息確認Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
channel.basicAck(long deliveryTag, boolean multiple)為消息確認,參數1:消息的id;參數2:是否批量應答,true批量確認小於次id的消息。

總結:消費者消費的每條消息都必須確認。

消息拒絕

消息在確認之前,可以有兩個選擇:

選擇1:斷開與Rabbit的連接,這樣Rabbit會重新把消息分派給另一個消費者;

選擇2:拒絕Rabbit發送的消息使用channel.basicReject(long deliveryTag, boolean requeue),參數1:消息的id;參數2:處理消息的方式,如果是true,Rabbib會重新分配這個消息給其他訂閱者,如果設置成false的話,Rabbit會把消息發送到一個特殊的“死信”隊列,用來存放被拒絕而不重新放入隊列的消息。

消息拒絕Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒絕
2、fanout交換器——發布/訂閱模式

fanout有別於direct交換器,fanout是一種發布/訂閱模式的交換器,當你發送一條消息的時候,交換器會把消息廣播到所有附加到這個交換器的隊列上。

比如用戶上傳了自己的頭像,這個時候圖片需要清除緩存,同時用戶應該得到積分獎勵,你可以把這兩個隊列綁定到圖片上傳的交換器上,這樣當有第三個、第四個上傳完圖片需要處理的需求的時候,原來的代碼可以不變,只需要添加一個訂閱消息即可,這樣發送方和消費者的代碼完全解耦,並可以輕而易舉的添加新功能了。

和direct交換器不同,我們在發送消息的時候新增channel.exchangeDeclare(ExchangeName, "fanout"),這行代碼聲明fanout交換器。

發送端:

final String ExchangeName = "fanoutec"; // 交換器名稱
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器
String message = "時間:" + new Date().getTime();
channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));
接收消息不同於direct,我們需要聲明fanout路由器,並使用默認的隊列綁定到fanout交換器上。

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器
String queueName = channel.queueDeclare().getQueue(); // 聲明隊列
channel.queueBind(queueName, ExchangeName, "");
Consumer consumer = new DefaultConsumer(channel) {@Override
br/>@Override
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
}
};
channel.basicConsume(queueName, true, consumer);
fanout和direct的區別最多的在接收端,fanout需要綁定隊列到對應的交換器用於訂閱消息。

其中channel.queueDeclare().getQueue()為隨機隊列,Rabbit會隨機生成隊列名稱,一旦消費者斷開連接,該隊列會自動刪除。

註意:對於fanout交換器來說routingKey(路由鍵)是無效的,這個參數是被忽略的。

3、topic交換器——匹配訂閱模式

最後介紹的是topic交換器,topic交換器運行和fanout類似,但是可以更靈活的匹配自己想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規則)匹配。

假設我們現在有一個日誌系統,會把所有日誌級別的日誌發送到交換器,warning、log、error、fatal,但我們只想處理error以上的日誌,要怎麽處理?這就需要使用topic路由器了。

topic路由器的關鍵在於定義路由鍵,定義routingKey名稱不能超過255字節,使用“.”作為分隔符,例如:com.mq.rabbit.error。

消費消息的時候routingKey可以使用下面字符匹配消息:

"*"可以匹配所有內容;
"#"匹配0和多個字符;
例如發布了一個“com.mq.rabbit.error”的消息:

能匹配上的路由鍵:

cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
不能匹配上的路由鍵:

cn.mq.
.error
所以如果想要訂閱所有消息,可以使用“#”匹配。

註意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途創建的隊列,獲取不到之前的消息。

發布端:

String routingKey = "com.mq.rabbit.error";
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器
String message = "時間:" + new Date().getTime();
channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));
接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器
String queueName = channel.queueDeclare().getQueue(); // 聲明隊列
String routingKey = "#.error";
channel.queueBind(queueName, ExchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {@Override
br/>@Override
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(routingKey + "|接收消息 => " + message);
}
};
channel.basicConsume(queueName, true, consumer);
擴展部分—自定義線程池

如果需要更大的控制連接,用戶可自己設置線程池,代碼如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
其實看過源碼的同學可能知道,factory.newConnection本身默認也有線程池的機制,ConnectionFactory.class部分源碼如下:

private ExecutorService sharedExecutor;
public Connection newConnection() throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}
public void setSharedExecutor(ExecutorService executor) {
this.sharedExecutor = executor;
}
其中this.sharedExecutor就是默認的線程池,可以通過setSharedExecutor()方法設置ConnectionFactory的線程池,如果不設置則為null。

用戶如果自己設置了線程池,像本小節第一段代碼寫的那樣,那麽當連接關閉的時候,不會自動關閉用戶自定義的線程池,所以用戶必須自己手動關閉,通過調用shutdown()方法,否則可能會阻止JVM的終止。

官方的建議是只有在程序出現嚴重性能瓶頸的時候,才應該考慮使用此功能。

RabbitMQ交換器Exchange介紹與實踐