RabbitMQ基礎教程之基本使用篇
RabbitMQ基礎教程之基本使用篇
最近因為工作原因使用到RabbitMQ,之前也接觸過其他的mq消息中間件,從實際使用感覺來看,卻不太一樣,正好趁著周末,可以好好看一下RabbitMQ的相關知識點;希望可以通過一些學習,可以搞清楚以下幾點
- 基礎環境搭建
- 可以怎麽使用
- 實現原理是怎樣的
- 實際工程中的使用(比如結合SpringBoot可以怎麽玩)
相關博文,歡迎查看:
- 《RabbitMq基礎教程之安裝與測試》
- 《RabbitMq基礎教程之基本概念》
I. 前提準備
在開始之前,先得搭建基本的環境,因為個人主要是mac進行的開發,所有寫了一篇mac上如何安裝rabbitmq的教程,可以通過 《mac下安裝和測試rabbitmq》 查看
1. Centos安裝過程
下面簡單說一下Linux系統下,可以如何安裝
Centos 系統:
# 安裝erlang
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
yum install erlang
# 安裝RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
啟動和查看的命令
# 完成後啟動服務:
service rabbitmq-server start
# 可以查看服務狀態:
service rabbitmq-server status
2. 註意
- 安裝完畢之後,可以開啟控制臺,主要就是
rabbitmq-plugins enable rabbitmq_management
, 默認的端口號為15672 - 默認分配的用戶/密碼為: guest/guest, 只允許本地訪問;如果跨應用讀寫數據時,請添加賬號和設置對應的權限(推薦參考上面mac安裝的博文,裏面有介紹)
II. 基本使用篇
直接使用amqp-client客戶端做基本的數據讀寫,先不考慮Spring容器的場景,我們可以怎樣進行塞數據,然後又怎樣可以從裏面獲取數據;
在實際使用之前,有必要了解一下RabbitMQ的幾個基本概念,即什麽是Queue,Exchange,Binding,關於這些基本概念,可以參考博文:
- 《RabbitMq基礎教程之基本概念》
1. 基本使用姿勢
首先是建立連接,一般需要設置服務器的IP,端口號,用戶名密碼之類的,公共代碼如下
public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
//創建連接工程,下面給出的是默認的case
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}
a. 生產者
要使用,基本的就需要一個消息投遞和一個消息消費兩方,線看消息生產者的一般寫法
public class MsgProducer {
public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動刪除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 發布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
channel.close();
connection.close();
}
}
針對上面的代碼,結合RabbitMQ的基本概念進行分析
基本結構- 不管是幹啥,第一步都是獲取連接,也就是上面的Connection
- 從《RabbitMq基礎教程之基本概念》直到,生產者消費者都是借助Channel與Exchange或者Queue打交道,接下來就是通過Connection創建數據流通信道Channel
- Channel準備完畢之後,生產者就可以向其中投遞數據
- 投遞完畢之後,回收現場資源
疑問:
- 在聲明Exchange時,是否就需要選擇消息綁定策略?
- 不聲明時,默認是什麽策略?
b. 消費者
結合上面的代碼和分析,大膽的預測下消費者的流程
- 獲取連接Connection
- 創建Channel
- 將Channel與Queue進行綁定
- 創建一個Consumer,從Queue中獲取數據
- 消息消費之後,ack
下面給出一個mq推數據的消費過程
public class MsgConsumer {
public static void consumerMsg(String exchange, String queue, String routingKey)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊列到交換機
channel.queueBind(queue, exchange, routingKey);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [x] Received ‘" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
}
}
2. Direct方式
a. Producer
直接在前面的基礎上進行測試,我們定義一個新的exchange名為direct.exchange
,並且制定ExchangeType為直接路由方式 (先不管這種寫法的合理性)
public class DirectProducer {
private static final String EXCHANGE_NAME = "direct.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
DirectProducer directProducer = new DirectProducer();
String[] routingKey = new String[]{"aaa", "bbb"};
String msg = "hello >>> ";
for (int i = 0; i < 30; i++) {
directProducer.publishMsg(routingKey[i % 2], msg + i);
}
System.out.println("----over-------");
}
}
上面的代碼執行一遍之後,看控制臺會發現新增了一個Exchange
exchangeb. consumer
同樣的我們寫一下對應的消費者,一個用來消費aaa,一個消費bbb
public class DirectConsumer {
private static final String exchangeName = "direct.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb"};
String[] queueNames = new String[]{"qa", "qb"};
for (int i = 0; i < 2; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 10);
}
}
執行上面的代碼之後,就會多兩個Queue,且增加了Exchange到Queue的綁定
binding queue當上面兩個代碼配合起來使用時,就可以看到對於消費者而言,qa一直消費的是偶數,qb一直消費的是奇數,一次輸出如下:
[qa] Waiting for message. To exist press CTRL+C
[qb] Waiting for message. To exist press CTRL+C
[qa] Received ‘hello >>> 0
[qb] Received ‘hello >>> 1
[qa] Received ‘hello >>> 2
[qb] Received ‘hello >>> 3
[qa] Received ‘hello >>> 4
...
3. Fanout方式
有了上面的case之後,這個的實現和測試就比較簡單了
a. Producer
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String[] routingKey = new String[]{"aaa", "bbb"};
String msg = "hello >>> ";
for (int i = 0; i < 30; i++) {
directProducer.publishMsg(routingKey[i % 2], msg + i);
}
System.out.println("----over-------");
}
}
b. consumer
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String[] routingKey = new String[]{"aaa", "bbb"};
String msg = "hello >>> ";
for (int i = 0; i < 30; i++) {
directProducer.publishMsg(routingKey[i % 2], msg + i);
}
System.out.println("----over-------");
}
}
這個的輸出就比較有意思了,fa,fb兩個隊列都可以接收到發布的消息,而且單獨的執行一次上面的投遞數據之後,發現fa/fb兩個隊列的數據都是30條
30然後消費的結果如下
[qa] Waiting for message. To exist press CTRL+C
[qb] Waiting for message. To exist press CTRL+C
[qa] Received ‘hello >>> 0
[qb] Received ‘hello >>> 0
[qa] Received ‘hello >>> 1
[qb] Received ‘hello >>> 1
[qb] Received ‘hello >>> 2
[qa] Received ‘hello >>> 2
[qa] Received ‘hello >>> 3
[qb] Received ‘hello >>> 3
[qb] Received ‘hello >>> 4
[qa] Received ‘hello >>> 4
...
4. Topic方式
代碼和上面差不多,就不重復拷貝了,接下來卡另外幾個問題
III. 基礎進階
在上面的基礎使用中,會有幾個疑問如下:
- Exchange聲明的問題(是否必須聲明,如果不聲明會怎樣)
- Exchange聲明的幾個參數(durable, autoDelete)有啥區別
- 當沒有隊列和Exchange綁定時,直接往隊列中塞數據,好像不會有數據增加(即先塞數據,然後創建queue,建立綁定,從控制臺上看這個queue裏面也不會有數據)
- 消息消費的兩種姿勢(一個主動去拿數據,一個是rabbit推數據)對比
- ack/nack怎麽用,nack之後消息可以怎麽處理
以上內容,留待下一篇進行講解
IV. 其他
1. 相關博文
- 《RabbitMq基礎教程之安裝與測試》
- 《RabbitMq基礎教程之基本概念》
2. 一灰灰Blog: https://liuyueyi.github.io/hexblog
一灰灰的個人博客,記錄所有學習和工作中的博文,歡迎大家前去逛逛
3. 聲明
盡信書則不如,已上內容,純屬一家之言,因個人能力有限,難免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激
- 微博地址: 小灰灰Blog
- QQ: 一灰灰/3302797840
4. 掃描關註
QrCodeRabbitMQ基礎教程之基本使用篇