RabbitMQ指南(二) 基本概念和開發
2.1 基本概念
下圖是RabbitMQ的基本模型,模型中包括以下部分:生產者、交換機、佇列和消費者。 生產者產生訊息,並將訊息傳送至交換機,交換機根據一定的路由規則將訊息傳送至一個或多個訊息佇列中,訊息的消費者從相應的訊息佇列中取資料,進行處理。 其中,交換機和佇列位於RabbitMQ服務端,生產者和消費者屬於RabbitMQ的客戶端。 RabbitMQ的客戶端建立與服務端的Socket長連線(Connection),並在其上建立輕量級的連線——通道(Channel),大部分的業務操作是在通道中進行的。有文章有形象的比喻:若連線是一根光纜,則通道就是光纜中的光纖。 RabbitMQ安裝部署完畢,會已經建立好一些交換機,進入Web管理介面->Exchanges頁籤,可看到下圖這些預設建立的交換機:
2.2 Java開發
2.2.1 準備工作
新建一個Maven專案,對於Java程式,需引入RabbitMQ客戶端所需的jar包。包括RabbitMQ客戶端的jar包(本文版本5.4.3),以及日誌所需的jar包(缺少該包會報錯):
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId >
<version>1.7.25</version>
</dependency>
</dependencies>
2.2.2 傳送方
訊息傳送方的程式碼如下。傳送方建立一個名為“Queue_Java”的佇列,並向其中傳送訊息“This is Java’s message”:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
// RabbitMQ服務端地址
private static final String ADDRESS = "10.176.65.172";
// RabbitMQ預設監聽埠為5672
private static final int PORT = 5672;
// 使用第一章建立的使用者mqtester進行登入
private static final String USERNAME = "mqtester";
private static final String PASSWORD = "mqtester";
// 預設交換機的名字為空字串
private static final String DEFAULT_EXCHANGE = "";
// 所要建立的佇列名稱
private static final String QUEUE_NAME = "Queue_Java";
public static void main(String[] args) throws Exception {
// 1.建立工廠類,設定服務端的地址、埠、使用者名稱和密碼
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 2.建立連線
Connection connection = factory.newConnection();
// 3.建立通道
Channel channel = connection.createChannel();
// 4.宣告佇列,該操作是冪等的,如果佇列不存在則建立佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 5.傳送訊息
byte[] message = "This is Java's message".getBytes();
channel.basicPublish(DEFAULT_EXCHANGE, QUEUE_NAME, null, message);
// 6.關閉連線
channel.close();
connection.close();
}
}
為簡潔起見,程式直接在主方法上丟擲異常,實際使用時應當以try…catch…塊處理異常。 從程式碼中可以看到,傳送訊息最後呼叫的是Channel類的basicPublish方法,該方法第一個引數為交換機的名稱,第二個引數為路由鍵,第四個引數為所要傳送的訊息(以位元組陣列形式)。前文提到,預設交換機與佇列繫結的路由鍵即為佇列名稱,故程式中以佇列名稱作為路由鍵入參,即可將訊息傳送至宣告的佇列中。 執行該段程式碼後,進入RabbitMQ的Web管理頁面->Queues頁籤,可看到出現了建立的佇列“Queue_Java”,並且該佇列中有1條訊息: 點選該佇列名稱,進入該佇列的管理介面,在“Get Messages”下拉框中,點選獲取訊息“Get Message(s)”按鍵,可以看到這條訊息就是程式傳送的訊息“This is Java’s message”:
2.2.3 接收方
接收方建立連線、通道、宣告佇列的過程與傳送方相同。RabbitMQ服務端的佇列收到訊息後,非同步將訊息推送給消費者,故消費者通過回撥方法處理接收到的訊息。接收方程式碼如下:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Reciver {
private static final String ADDRESS = "10.176.65.172";
private static final int PORT = 5672;
private static final String QUEUE_NAME = "Queue_Java";
private static final String USERNAME = "mqtester";
private static final String PASSWORD = "mqtester";
public static void main(String[] args) throws Exception {
// 1.建立工廠類,設定服務端地址、埠、使用者名稱和密碼
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 2.建立連線
Connection connection = factory.newConnection();
// 3.建立通道
Channel channel = connection.createChannel();
// 4.宣告佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 5.建立消費者,從佇列中消費訊息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Received: " + message);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
在執行basicConsume()方法後,消費者開始監聽佇列,佇列中有訊息時,將回調消費者重寫的handleDelivery()方法。執行該程式,可以看到控制檯打印出生產者傳送的“This is Java’s message”訊息。再次檢視Web管理端的佇列,可看到佇列Queue_Java中的訊息數變為0了,之前傳送的訊息已經被消費:
2.3 C#開發
2.3.1 準備工作
下載RabbitMQ客戶端C#版本,下載地址: http://www.rabbitmq.com/releases/rabbitmq-dotnet-client 版本:rabbitmq-dotnet-client-3.6.14-dotnet-4.5.zip 壓縮包中bin/RabbitMQ.Client.dll是C#開發需要的動態連結庫檔案。在Visual Studio中新建C#控制檯程式,在專案的引用中新增RabbitMQ.Client.dll。
2.3.2 傳送方
C#版本的傳送方程式碼與Java流程相同,由於C#有using關鍵字,省去了關閉連線的程式碼。如下:
using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQDemoCSharp
{
public class Sender
{
// RabbitMQ服務端地址
private const string ADDRESS = "10.176.65.227";
// RabbitMQ預設監聽埠為5672
private const int PORT = 5672;
// 使用第一章建立的使用者mqtester進行登入
private const string USERNAME = "mqtester";
private const string PASSWORD = "mqtester";
// 預設交換機的名字為空字串
private const string DEFAULT_EXCHANGE = "";
// 所要建立的佇列名稱
private const string QUEUE_NAME = "Queue_C#";
public static void Main(string[] args)
{
// 1.建立工廠類,設定服務端地址、埠、使用者名稱和密碼
var factory = new ConnectionFactory()
{
HostName = ADDRESS,
Port = PORT,
UserName = USERNAME,
Password = PASSWORD
};
// 2.建立連線
using (var connection = factory.CreateConnection())
{
// 3.建立通道
using (var channel = connection.CreateModel())
{
// 4.宣告佇列
channel.QueueDeclare(queue: QUEUE_NAME,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
// 5.傳送訊息
byte[] message = Encoding.UTF8.GetBytes("This is C#'s message");
channel.BasicPublish(exchange: DEFAULT_EXCHANGE,
routingKey: QUEUE_NAME,
basicProperties: null,
body: message);
}
Console.ReadLine();
}
}
}
}
2.3.3 接收方
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQDemoCSharp
{
public class Receiver
{
private const string ADDRESS = "10.176.65.182";
private const int PORT = 5672;
private const string USERNAME = "mqtester";
private const string PASSWORD = "mqtester";
private const string QUEUE_NAME = "Queue_C#";
public static void Main(string[] args)
{
// 1.建立工廠類,設定服務端地址、埠、使用者名稱和密碼
var factory = new ConnectionFactory()
{
HostName = ADDRESS,
Port = PORT,
UserName = USERNAME,
Password = PASSWORD
};
// 2.建立連線
using (var connection = factory.CreateConnection())
{
// 3.建立通道
using (var channel = connection.CreateModel())
{
// 4.宣告佇列
channel.QueueDeclare(queue: QUEUE_NAME,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
// 5.建立消費者,接收訊息
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine("Received: " + message);
};
channel.BasicConsume(queue: QUEUE_NAME,
noAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
}
2.4 Python開發
2.4.1 準備工作
使用pip命令安裝pika包:
pip install pika
2.4.2 傳送方
import pika
# RabbitMQ服務端地址
ADDRESS = '10.176.65.172'
# RabbitMQ預設監聽埠為5672
PORT = 5672
# 使用第一章建立的使用者mqtester進行登入
USERNAME = 'mqtester'
PASSWORD = 'mqtester'
# 預設交換機的名字為空字串
DEFAULT_EXCHANGE = ''
# 所要建立的佇列名稱
QUEUE_NAME = 'Queue_Python'
# 1.設定服務端地址、埠、使用者名稱和密碼
credentials = pika.PlainCredentials(username=USERNAME, password=PASSWORD)
# 2.建立連線
connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(host=ADDRESS, port=PORT, credentials=credentials))
# 3.建立通道
channel = connection.channel()
# 4.宣告佇列
channel.queue_declare(queue=QUEUE_NAME, durable=False, exclusive=False, auto_delete=False, arguments=None)
# 5.傳送訊息
channel.basic_publish(exchange=DEFAULT_EXCHANGE, routing_key=QUEUE_NAME, body="This is Python's message")
# 6.關閉連線
channel.close()
connection.close()
2.4.3 接收方
import pika
# RabbitMQ服務端地址
ADDRESS = '10.176.65.172'
# RabbitMQ預設監聽埠為5672
PORT = 5672
# 使用第一章建立的使用者mqtester進行登入
USERNAME = 'mqtester'
PASSWORD = 'mqtester'
# 預設交換機的名字為空字串
DEFAULT_EXCHANGE = ''
# 所要建立的佇列名稱
QUEUE_NAME = 'Queue_Python'
# 1.設定服務端地址、埠、使用者名稱和密碼
credentials = pika.PlainCredentials(username=USERNAME, password=PASSWORD)
# 2.建立連線
connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(host=ADDRESS, port=PORT, credentials=credentials))
# 3.建立通道
channel = connection.channel()
# 4.宣告佇列
channel.queue_declare(queue=QUEUE_NAME, durable=False, exclusive=False, auto_delete=False, arguments=None)
# 5.定義回撥方法,消費訊息
def callback(ch, method, properties, body):
print('Received: ' + body.decode('utf8'))
channel.basic_consume(consumer_callback=callback, queue=QUEUE_NAME, no_ack=True)
channel.start_consuming()