1. 程式人生 > >RabbitMQ入門教程(七):主題交換機Topics

RabbitMQ入門教程(七):主題交換機Topics

簡介

本節主要演示交換機的另一種型別:主題型別topic,直連線型別direct必須是生產者釋出訊息指定的routingKey和消費者在佇列繫結時指定的routingKey完全相等時才能匹配到佇列上,與direct不同,topic可以進行模糊匹配,可以使用星號*和井號#這兩個萬用字元來進行模糊匹配,其中星號可以代替一個單詞;主題型別的轉發器的訊息不能隨意的設定選擇鍵(routing_key),必須是由點隔開的一系列的識別符號組成。識別符號可以是任何東西,但是一般都與訊息的某些特性相關。一些合法的選擇鍵的例子:”quick.orange.rabbit”,你可以定義任何數量的識別符號,上限為255個位元組。 #井號可以替代零個或更多的單詞,只要能模糊匹配上就能將訊息對映到佇列中。當一個佇列的繫結鍵為#的時候,這個佇列將會無視訊息的路由鍵,接收所有的訊息

這裡寫圖片描述

生產者

public class Producer {
    @Test
    public void testBasicPublish() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday"
); factory.setPassword("mengday"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Routing 的路由規則使用直連線 String EXCHANGE_NAME = "exchange.topic.x"; String[] routingKeys = {"quick.orange.rabbit", "lazy.orange.elephant"
, "mq.erlang.rabbit", "lazy.brown.fox", "lazy."}; for (String routingKey : routingKeys){ String message = "Hello RabbitMQ - " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); } // 關閉資源 channel.close(); connection.close(); } }

消費者1

public class Consumer1 {
    @Test
    public void testBasicConsumer1() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "exchange.topic.x";
        String QUEUE_NAME = "queue.topic.q1";
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String[] routingKeys = {"*.orange.*"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received '" + message + "', 處理業務中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }

}

消費者2

public class Consumer2 {
    @Test
    public void testBasicConsumer2() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "exchange.topic.x";
        String QUEUE_NAME = "queue.topic.q2";
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String[] routingKeys = {"*.*.rabbit", "lazy.#"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received '" + message + "', 處理業務中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }
}

執行效果

先啟動消費者,再啟動生產者。

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

有的訊息可以滿足多個模糊的路由鍵,具體會路由到哪個佇列,關於同時滿足條件以後再研究。

下面是一張topic的圖,描述的比較準確
這裡寫圖片描述

預定義交換器

系統預定義了一個日誌交換機,名稱為“amq.rabbitmq.log”,型別為topic, 可以通過宣告一個匿名佇列,然後通過路由鍵(error warning info)等分別與佇列進行繫結,從而來消費訊息。 可以針對不同級別的訊息進行處理,如error級別的可以發郵件給相關負責人。 消費者需要自己實現,往佇列中發訊息是RabbitMQ自己傳送的,當有日誌訊息時RabbitMQ會自動發到amq.rabbitmq.log交換機中。

我的微信公眾號:

相關推薦

RabbitMQ入門教程()主題交換機Topics

簡介 本節主要演示交換機的另一種型別:主題型別topic,直連線型別direct必須是生產者釋出訊息指定的routingKey和消費者在佇列繫結時指定的routingKey完全相等時才能匹配到佇列上,與direct不同,topic可以進行模糊匹配,可以使用星號

RabbitMQ入門教程(一)安裝和常用命令

一:Mac安裝 Mac安裝比Windows安裝更加方便,也不需要再額外配置Web外掛,因為在安裝的時候預設已經配置好了 // 在Updating Homebrew...時可能會卡一會,只需要等就行了 // 在安裝的過程中可能因為網路問題,可能會有部分會失

RabbitMQ入門教程(二)簡介和基本概念

一:簡介 RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗,訊息佇列是一種應用系統之間的通訊方法,是通過讀寫出入佇列的訊息來通訊(RPC則是通

RabbitMQ入門教程(十三)虛擬主機vhost與許可權管理

虛擬主機vhost 每一個RabbitMQ伺服器都能建立虛擬訊息伺服器,我們稱之為虛擬主機。每一個vhost本質上是一個mini版的RabbitMQ伺服器,擁有自己的交換機、佇列、繫結等,擁有自己的許可權機制。vhost之於Rabbit就像虛擬機器之於物理機一

RabbitMQ入門教程(六)路由選擇Routing

簡介 本節主要演示使用直連線型別,將多個路由鍵繫結到同一個佇列上。也可以將同一個鍵繫結到多個佇列上(多重繫結multiple bindings),此時滿足鍵的佇列都能收到訊息,不滿足的直接被丟棄。

RabbitMQ入門教程(十五)普通叢集和映象叢集

普通叢集 推薦一篇優秀的文章: 映象叢集 映象叢集的特點:所有節點的訊息都會進行同步。RabbitMQ是沒有中心的。 Rabbit映象功能,需要基於rabbitmq策略來實現,政策是用來控制和修改群集範圍的某個vhost佇列行為和Exchan

RabbitMQ學習筆記交換機、佇列、訊息的持久化

一、概述     在生產過程中,難免會發生伺服器宕機的事情,RabbitMQ也不例外,可能由於某種特殊情況下的異常而導致RabbitMQ宕機從而重啟,那麼這個時候對於訊息佇列裡的資料,包括交換機、佇列以及佇列中存在訊息恢復就顯得尤為重要了。RabbitMQ本身帶有持久化機制

RabbitMQ入門教程(十四)RabbitMQ單機叢集搭建

叢集簡介 理解叢集先理解一下元資料 佇列元資料:佇列的名稱和宣告佇列時設定的屬性(是否持久化、是否自動刪除、佇列所屬的節點) 交換機元資料:交換機的名稱、型別、屬性(是否持久化等) 繫結元資料:一張簡單的表格展示瞭如何將訊息路由到佇列。包含的列有 交換機名

RabbitMQ入門教程(十一)訊息屬性Properties

簡介 傳送訊息可以為訊息指定一些引數 Delivery mode: 是否持久化,1 - Non-persistent,2 - Persistent Headers:Headers can have

RabbitMQ系列教程之一我們從最簡單的事情開始!Hello World

model 系列教程 退出 utf 忽略 是你 必須 using chan 一、簡介 RabbitMQ是一個消息的代理器,用於接收和發送消息,你可以這樣想,他就是一個郵局,當您把需要寄送的郵件投遞到郵筒之時,你可以確定的是郵遞員先生肯定會把郵件發送到需要接收郵件的

Spring Cloud 入門教程(三) 配置自動刷新

入門 stc pro 解決方案 con log clas ring color 之前講的配置管理, 只有在應用啟動時會讀取到GIT的內容, 之後只要應用不重啟,GIT中文件的修改,應用無法感知, 即使重啟Config Server也不行。 比如上一單元(Spring Clo

Spring Cloud 入門教程(四) 分布式環境下自動發現配置服務

.html article png discover ice conf label tail 註釋 前一章, 我們的Hello world應用服務,通過配置服務器Config Server獲取到了我們配置的hello信息“hello world”. 但自己的配置文件中必須配

Spring Boot系列教程Spring boot集成MyBatis

override fill sql water sso avi size logs index 一.創建項目 項目名稱為 “springboot_mybatis_demo”,創建過程中勾選 “Web”,“MyBatis”,“MySQL”,第一次創建Maven

spring cloud 入門系列基於Git存儲的分布式配置中心--Spring Cloud Config

入門 代碼結構 dev eas TP scope ict AI 新項目 我們前面接觸到的spring cloud組件都是基於Netflix的組件進行實現的,這次我們來看下spring cloud 團隊自己創建的一個全新項目:Spring Cloud Config.它用來為分

RabbitMQ入門教程

fanout color -i www 快速 單播 odi 區別 multi 1.引言 RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其為消息隊列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發的AMQP(高級消息隊列

OPENCV入門教程十三GaussianBlur高斯平滑

OPENCV入門教程十三:GaussianBlur高斯平滑   一、目標 學習如何使用OpenCV中的函式,學習對影象的高斯平滑操作,學習GaussianBlur()函式的使用 二、函式說明 函式原型: void GaussianBlur(InputArray sr

protobuf入門教程(五)列舉(enum)、包(package)

列舉(enum) 訊息格式 當需要定義一個訊息型別的時候,可能想為一個欄位指定某“預定義值序列”中的一個值,這時候可以通過列舉實現。 syntax = "proto3";//指定版本資訊,不指定會報錯 message Person //message為關鍵字,作用為定義一種訊息型別 {

Spring Security教程()RememberMe功能

在之前的教程中一筆帶過式的講了下RememberMe記住密碼的功能,那篇的Remember功能是最簡易的配置,其功能和安全性都不強。這裡就配置下security中RememberMe的各種方式。 一、概述 RememberMe 是指使用者在網站上能夠在 Session 之間記住登入使用者的身份的憑證,通俗

OPENCV入門教程flip垂直水平映象

在該flip.cpp檔案中新增一下程式碼#include "cv.h"                             //  OpenCV 檔案頭#include "highgui.h"#include "cvaux.h"#include "cxcore.h"#include "opencv2/op

Spring Cloud 入門教程(五) Ribbon實現客戶端的負載均衡

接上節,假如我們的Hello world服務的訪問量劇增,用一個服務已經無法承載, 我們可以把Hello World服務做成一個叢集。  很簡單,我們只需要複製Hello world服務,同時將原來的埠8762修改為8763。然後啟動這兩個Spring Boot應用, 就可