1. 程式人生 > >RabbitMQ指南(二) 基本概念和開發

RabbitMQ指南(二) 基本概念和開發

2.1 基本概念

  下圖是RabbitMQ的基本模型,模型中包括以下部分:生產者、交換機、佇列和消費者。   生產者產生訊息,並將訊息傳送至交換機,交換機根據一定的路由規則將訊息傳送至一個或多個訊息佇列中,訊息的消費者從相應的訊息佇列中取資料,進行處理。   其中,交換機和佇列位於RabbitMQ服務端,生產者和消費者屬於RabbitMQ的客戶端。 RabbitMQ基本模型   RabbitMQ的客戶端建立與服務端的Socket長連線(Connection),並在其上建立輕量級的連線——通道(Channel),大部分的業務操作是在通道中進行的。有文章有形象的比喻:若連線是一根光纜,則通道就是光纜中的光纖。   RabbitMQ安裝部署完畢,會已經建立好一些交換機,進入Web管理介面->Exchanges頁籤,可看到下圖這些預設建立的交換機: 預設建立的交換機

  其中,預設交換機“(AMQP default)”最為特殊,它的名字是一個空字串(””),不能被刪除。所有建立的佇列都會與之連線(稱為“繫結”),且不能解綁。繫結使用的路由鍵(Routing Key)即為佇列的名稱。   本節的基本內容,即建立一個訊息的傳送方(生產者)、接收方(消費者)和與預設交換機繫結的佇列,傳送方通過預設交換機向該佇列中傳送一條訊息,接收方從該佇列中取出訊息。   RabbitMQ支援多種程式語言的客戶端,本文主要使用Java、C#和Python。在開發之前,需要做一定的準備工作。

2.2 Java開發

2.2.1 準備工作

  新建一個Maven專案,對於Java程式,需引入RabbitMQ客戶端所需的jar包。包括RabbitMQ客戶端的jar包(本文版本5.4.3),以及日誌所需的jar包(缺少該包會報錯):

	<dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.4.3</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-nop</artifactId
>
<version>1.7.25</version> </dependency> </dependencies>

2.2.2 傳送方

  訊息傳送方的程式碼如下。傳送方建立一個名為“Queue_Java”的佇列,並向其中傳送訊息“This is Java’s message”:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Sender {

	// RabbitMQ服務端地址
	private static final String ADDRESS = "10.176.65.172";
	// RabbitMQ預設監聽埠為5672
	private static final int PORT = 5672;
	// 使用第一章建立的使用者mqtester進行登入
	private static final String USERNAME = "mqtester";
	private static final String PASSWORD = "mqtester";
	// 預設交換機的名字為空字串
	private static final String DEFAULT_EXCHANGE = "";
	// 所要建立的佇列名稱
	private static final String QUEUE_NAME = "Queue_Java";


	public static void main(String[] args) throws Exception {
		// 1.建立工廠類,設定服務端的地址、埠、使用者名稱和密碼
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(ADDRESS);
		factory.setPort(PORT);
		factory.setUsername(USERNAME);
		factory.setPassword(PASSWORD);
		// 2.建立連線
		Connection connection = factory.newConnection();
		// 3.建立通道
		Channel channel = connection.createChannel();
		// 4.宣告佇列,該操作是冪等的,如果佇列不存在則建立佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		// 5.傳送訊息
		byte[] message = "This is Java's message".getBytes();
		channel.basicPublish(DEFAULT_EXCHANGE, QUEUE_NAME, null, message);
		// 6.關閉連線
		channel.close();
		connection.close();
	}
}

  為簡潔起見,程式直接在主方法上丟擲異常,實際使用時應當以try…catch…塊處理異常。   從程式碼中可以看到,傳送訊息最後呼叫的是Channel類的basicPublish方法,該方法第一個引數為交換機的名稱,第二個引數為路由鍵,第四個引數為所要傳送的訊息(以位元組陣列形式)。前文提到,預設交換機與佇列繫結的路由鍵即為佇列名稱,故程式中以佇列名稱作為路由鍵入參,即可將訊息傳送至宣告的佇列中。   執行該段程式碼後,進入RabbitMQ的Web管理頁面->Queues頁籤,可看到出現了建立的佇列“Queue_Java”,並且該佇列中有1條訊息: Java佇列建立   點選該佇列名稱,進入該佇列的管理介面,在“Get Messages”下拉框中,點選獲取訊息“Get Message(s)”按鍵,可以看到這條訊息就是程式傳送的訊息“This is Java’s message”: Java訊息傳送

2.2.3 接收方

  接收方建立連線、通道、宣告佇列的過程與傳送方相同。RabbitMQ服務端的佇列收到訊息後,非同步將訊息推送給消費者,故消費者通過回撥方法處理接收到的訊息。接收方程式碼如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class Reciver {

	private static final String ADDRESS = "10.176.65.172";
	private static final int PORT = 5672;
	private static final String QUEUE_NAME = "Queue_Java";
	private static final String USERNAME = "mqtester";
	private static final String PASSWORD = "mqtester";


	public static void main(String[] args) throws Exception {
		// 1.建立工廠類,設定服務端地址、埠、使用者名稱和密碼
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(ADDRESS);
		factory.setPort(PORT);
		factory.setUsername(USERNAME);
		factory.setPassword(PASSWORD);
		// 2.建立連線
		Connection connection = factory.newConnection();
		// 3.建立通道
		Channel channel = connection.createChannel();
		// 4.宣告佇列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		// 5.建立消費者,從佇列中消費訊息
		Consumer consumer = new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String message = new String(body);
				System.out.println("Received: " + message);
			}
		};
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
}

  在執行basicConsume()方法後,消費者開始監聽佇列,佇列中有訊息時,將回調消費者重寫的handleDelivery()方法。執行該程式,可以看到控制檯打印出生產者傳送的“This is Java’s message”訊息。再次檢視Web管理端的佇列,可看到佇列Queue_Java中的訊息數變為0了,之前傳送的訊息已經被消費: Java訊息消費

2.3 C#開發

2.3.1 準備工作

  下載RabbitMQ客戶端C#版本,下載地址:   http://www.rabbitmq.com/releases/rabbitmq-dotnet-client   版本:rabbitmq-dotnet-client-3.6.14-dotnet-4.5.zip   壓縮包中bin/RabbitMQ.Client.dll是C#開發需要的動態連結庫檔案。在Visual Studio中新建C#控制檯程式,在專案的引用中新增RabbitMQ.Client.dll。 C#新增引用

2.3.2 傳送方

  C#版本的傳送方程式碼與Java流程相同,由於C#有using關鍵字,省去了關閉連線的程式碼。如下:

using RabbitMQ.Client;
using System;
using System.Text;

namespace RabbitMQDemoCSharp
{
    public class Sender
    {
        // RabbitMQ服務端地址
        private const string ADDRESS = "10.176.65.227";
        // RabbitMQ預設監聽埠為5672
        private const int PORT = 5672;
        // 使用第一章建立的使用者mqtester進行登入
        private const string USERNAME = "mqtester";
        private const string PASSWORD = "mqtester";
        // 預設交換機的名字為空字串
        private const string DEFAULT_EXCHANGE = "";
        // 所要建立的佇列名稱
        private const string QUEUE_NAME = "Queue_C#";

        public static void Main(string[] args)
        {
            // 1.建立工廠類,設定服務端地址、埠、使用者名稱和密碼
            var factory = new ConnectionFactory()
            {
                HostName = ADDRESS,
                Port = PORT,
                UserName = USERNAME,
                Password = PASSWORD
            };
            // 2.建立連線
            using (var connection = factory.CreateConnection())
            {
                // 3.建立通道
                using (var channel = connection.CreateModel())
                {
                    // 4.宣告佇列
                    channel.QueueDeclare(queue: QUEUE_NAME,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
                    // 5.傳送訊息
                    byte[] message = Encoding.UTF8.GetBytes("This is C#'s message");
                    channel.BasicPublish(exchange: DEFAULT_EXCHANGE,
                                 routingKey: QUEUE_NAME,
                                 basicProperties: null,
                                 body: message);
                }
                Console.ReadLine();
            }
        }
    }
}

2.3.3 接收方

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQDemoCSharp
{
    public class Receiver
    {
        private const string ADDRESS = "10.176.65.182";
        private const int PORT = 5672;
        private const string USERNAME = "mqtester";
        private const string PASSWORD = "mqtester";
        private const string QUEUE_NAME = "Queue_C#";

        public static void Main(string[] args)
        {
            // 1.建立工廠類,設定服務端地址、埠、使用者名稱和密碼
            var factory = new ConnectionFactory()
            {
                HostName = ADDRESS,
                Port = PORT,
                UserName = USERNAME,
                Password = PASSWORD
            };
            // 2.建立連線
            using (var connection = factory.CreateConnection())
            {
                // 3.建立通道
                using (var channel = connection.CreateModel())
                {
                    // 4.宣告佇列
                    channel.QueueDeclare(queue: QUEUE_NAME,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
                    // 5.建立消費者,接收訊息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine("Received: " + message);
                    };
                    channel.BasicConsume(queue: QUEUE_NAME,
                                         noAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

2.4 Python開發

2.4.1 準備工作

  使用pip命令安裝pika包:

pip install pika

2.4.2 傳送方

import pika

# RabbitMQ服務端地址
ADDRESS = '10.176.65.172'
# RabbitMQ預設監聽埠為5672
PORT = 5672
# 使用第一章建立的使用者mqtester進行登入
USERNAME = 'mqtester'
PASSWORD = 'mqtester'
# 預設交換機的名字為空字串
DEFAULT_EXCHANGE = ''
# 所要建立的佇列名稱
QUEUE_NAME = 'Queue_Python'

# 1.設定服務端地址、埠、使用者名稱和密碼
credentials = pika.PlainCredentials(username=USERNAME, password=PASSWORD)
# 2.建立連線
connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(host=ADDRESS, port=PORT, credentials=credentials))
# 3.建立通道
channel = connection.channel()
# 4.宣告佇列
channel.queue_declare(queue=QUEUE_NAME, durable=False, exclusive=False, auto_delete=False, arguments=None)
# 5.傳送訊息
channel.basic_publish(exchange=DEFAULT_EXCHANGE, routing_key=QUEUE_NAME, body="This is Python's message")
# 6.關閉連線
channel.close()
connection.close()

2.4.3 接收方

import pika

# RabbitMQ服務端地址
ADDRESS = '10.176.65.172'
# RabbitMQ預設監聽埠為5672
PORT = 5672
# 使用第一章建立的使用者mqtester進行登入
USERNAME = 'mqtester'
PASSWORD = 'mqtester'
# 預設交換機的名字為空字串
DEFAULT_EXCHANGE = ''
# 所要建立的佇列名稱
QUEUE_NAME = 'Queue_Python'

# 1.設定服務端地址、埠、使用者名稱和密碼
credentials = pika.PlainCredentials(username=USERNAME, password=PASSWORD)
# 2.建立連線
connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(host=ADDRESS, port=PORT, credentials=credentials))
# 3.建立通道
channel = connection.channel()
# 4.宣告佇列
channel.queue_declare(queue=QUEUE_NAME, durable=False, exclusive=False, auto_delete=False, arguments=None)


# 5.定義回撥方法,消費訊息
def callback(ch, method, properties, body):
    print('Received: ' + body.decode('utf8'))


channel.basic_consume(consumer_callback=callback, queue=QUEUE_NAME, no_ack=True)
channel.start_consuming()