1. 程式人生 > >RabbitMQ知識盤點【壹】_訊息佇列介紹及三種訊息路由模式

RabbitMQ知識盤點【壹】_訊息佇列介紹及三種訊息路由模式

最近在看訊息佇列的書籍,把一些收穫總結一下。

首先說說什麼是訊息佇列。這裡就不說那種教科書的定義了,以我的理解,訊息佇列就是通過接收和傳送訊息,使不同的應用系統連線起來。實現了業務系統的解耦,也跨越了系統編寫語言的限制。總結來說,訊息佇列在當下分散式系統中的應用場景可歸納如下:

1.非同步RPC;

2.增強效能拓展性,並行處理不同業務;

3.構建日誌告警系統,針對不同日誌級別傳送不同告警;

目前比較流行的訊息佇列框架有ActiveMQ、RabbitMQ、RocketMQ和Kafka等等。本篇先寫寫RabbitMQ,按照慣例還是推薦下面這本《RabbitMQ實戰》,本文大部分內容總結於此(但此書是基於python語言實現的,特此說明):


RabbitMQ的介紹

RabbitMQ的設計之初就是為了解決金融業系統的分發訊息問題的,這就奠定了它安全穩定的基礎。而且在當下流行的開源訊息佇列解決方案中,它也可能是唯一一個遵循AMQP規範實現的(AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品,不同的開發語言等條件的限制)。

先說一下RabbitMQ的大體模型吧。RabbitMQ主要由生產者、消費者、訊息、通道(channel)、交換器(exchange)和佇列(queue)組成。

生產者先通過tcp連線到RabbitMQ伺服器,然後在tcp連線中建立一條AMQP通道,通道中可以宣告交換器、佇列及它們之間的繫結關係(bind)。當通道的各種資訊設定好後,就可以生產訊息併發布到通道中了。

消費者也是同理,先連線到RabbitMQ伺服器,建立一條tcp連線。當tcp連線開啟後,消費者就可以建立一條AMQP通道(channel)。設定好通道上的各種資訊後就可以開始接收訊息了。

AMQP訊息包含兩部分,有效載荷(payload)和標籤(label)。有效載荷就是想傳輸的資料,標籤是用來決定誰將獲得訊息。標籤通常包括一個交換器的名稱和可選的主體標記,rabbit會根據標籤把訊息傳送給接收方,不過在路由的過程中則不會帶上標籤。

通道是建立在真實的tcp連線內的虛擬連線,AMQP命令都是通過通道傳送出去的。每條通道都會被指派一個唯一id(AMQP會記住)。因為建立和銷燬tcp連線開銷很大,所以有通道的存在。不同執行緒可以使用不同的通道,而且互不影響。

AMQP訊息路由有三個部分:交換器(exchange)、佇列(queue)和繫結(bind)。生產者把訊息釋出到交換器上,訊息最終到達佇列並被消費者接收;繫結決定了訊息如何從路由器路由到特定的佇列。

具體來說,繫結(Bind)是交換器(Exchange)和佇列(Queue)繫結的規則描述,可解析當交換器接收到的訊息中路由鍵(RoutingKey)這個欄位,根據這個路由鍵和當前交換器所有的繫結做匹配,如果滿足則向所繫結的佇列(Queue)傳送訊息。這樣我們就解決了我們向RabbitMQ傳送一次訊息,可以分發到不同的Queue的過程。

三種訊息路由模式

RabbitMQ共支援4種訊息路由模式,分別為direct、fanout、topic和head。其中head模式是基於訊息的head資訊進行投遞的,應用場景並不多,這裡著重介紹前面三種模式。


direct:交換器和佇列是一對一的關係,消費者在接收訊息的時候交換器和佇列的名稱必須和生產者定義的完全匹配。

參考程式碼示例

生產者:

ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("localhost");
		com.rabbitmq.client.Connection connection = cf.newConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare("direct_queue", true, false, false, null);
		
		for (int i = 0; i < 1000; i++) {
			channel.basicPublish("", "direct_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("test"+i).getBytes());
			System.out.println("訊息"+i+"已傳送");
		}
		
		channel.close();
		connection.close();

消費者:
ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("localhost");
		final Connection connection = cf.newConnection();
		final Channel channel = connection.createChannel();
		channel.queueDeclare("xinzun_queue", true, false, false, null);
		channel.basicQos(1);
		
		Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到訊息"+ message);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                	
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume("xinzun_queue", false, consumer);

上面程式碼我們使用了一個名字為空字串的交換器。這是因為RabbitMQ要求伺服器必須實現direct型別交換器,包含一個空白字串名稱的預設交換器。當宣告一個佇列時會自動繫結到預設交換器,並以佇列名稱作為路由鍵。

fanout:把訊息投遞給所有繫結在此交換器上的佇列。

使用此模式時,生產者可以在釋出訊息的時候,只在通道上宣告fanout型別的交換器而不繫結佇列給交換器,繫結的操作留給消費者去完成。

程式碼示例

生產者:

ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("localhost");
		com.rabbitmq.client.Connection connection = cf.newConnection();
		Channel channel = connection.createChannel();
		channel.exchangeDeclare("fanout_exchange", "fanout",false,false,null);
		
		for (int i = 0; i < 1000; i++) {
			channel.basicPublish("fanout_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("test"+i).getBytes());
			System.out.println("訊息"+i+"已傳送");
		}
		
		channel.close();
		connection.close();


消費者:

ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("localhost");
		final Connection connection = cf.newConnection();
		final Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		channel.queueBind(QUEUE_NAME, "fanout_exchange", "");
		channel.basicQos(1);
		
		Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到訊息"+ message);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                	
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume("fanout_exchange", false, consumer);


topic:交換器和佇列可以通過模糊繫結,“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。關鍵字之間通過“.”分隔。

示例程式碼

生產者:

ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("localhost");
		com.rabbitmq.client.Connection connection = cf.newConnection();
		Channel channel = connection.createChannel();
		channel.exchangeDeclare("topic_exchange", "topic",false,false,null);	

		for (int i = 0; i < 1000; i++) {
			channel.basicPublish("topic_exchange", "topic.info.msg", MessageProperties.PERSISTENT_TEXT_PLAIN, ("test"+i).getBytes());
			System.out.println("訊息"+i+"已傳送");
		}
		
		channel.close();
		connection.close();

消費者:

ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("localhost");
		final Connection connection = cf.newConnection();
		final Channel channel = connection.createChannel();
		channel.exchangeDeclare("topic_exchange", "topic",false,false,null);
		channel.queueDeclare("info_queue2", true, false, false, null);
		channel.queueBind("info_queue2", "topic_exchange", "#.info.#");
		channel.basicQos(1);
		
		Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(consumerTag+"收到訊息"+ message);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                	
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume("info_queue2", false, consumer);

下一篇文章:RabbitMQ知識盤點【貳】_實現原理及RabbitMQ叢集