1. 程式人生 > >譯: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 釋出和訂閱

譯: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 釋出和訂閱

第一篇教程中,我們展示瞭如何使用start.spring.io來利用Spring Initializr建立一個具有RabbitMQ starter dependency的專案來建立spring-amqp應用程式。

在上一個教程中,我們建立了一個新的包(tut2)來放置我們的配置,傳送者和接收者,並建立了一個包含兩個使用者的工作佇列。工作佇列背後的假設是每個任務都交付給一個工作者。

在這部分中,我們將實現扇出模式,以向多個消費者傳遞訊息。此模式稱為 Publish/Subscribe “釋出/訂閱”,並通過在Tut3Config檔案中配置多個bean來實現。

基本上,已釋出的訊息將被廣播給所有接收者。

Exchanges

在本教程的前幾部分中,我們向佇列傳送訊息和從佇列接收訊息。現在是時候在Rabbit中引入完整的訊息傳遞模型了。

讓我們快速回顧一下前面教程中介紹的內容:

  • 生產者是傳送訊息的使用者的應用程式。
  • 佇列是儲存訊息的緩衝器。
  • 消費者是接收訊息的使用者的應用程式。

RabbitMQ中訊息傳遞模型的核心思想是生產者永遠不會將任何訊息直接傳送到佇列。實際上,生產者通常甚至不知道訊息是否會被傳遞到任何佇列。

相反,生產者只能向交易所傳送訊息。Exchanges交換是一件非常簡單的事情。一方面,它接收來自生產者的訊息,另一方面將它們推送到佇列。交易所必須確切知道如何處理收到的訊息。

它應該附加到特定佇列嗎?它應該附加到許多佇列嗎?或者它應該被丟棄。其規則由交換型別定義 

 

 有幾種交換型別可供選擇:

  • direct
  • topic
  • headers 
  • fanout 

我們將專注於最後一個 - fanout。讓我們配置一個bean來描述這種型別的交換,並將其命名為tut.fanout

 Tut3Config.java

import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import com.xingyun.springamqp.business.Tut3Receiver; import com.xingyun.springamqp.business.Tut3Sender; @Profile({ "tut3", "pub-sub", "publish-subscribe" }) @Configuration public class Tut3Config { @Bean public FanoutExchange fanout() { return new FanoutExchange("tut.fanout"); } @Profile("receiver") private static class ReceiverConfig { @Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); } @Bean public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(fanout); } @Bean public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2).to(fanout); } @Bean public Tut3Receiver receiver() { return new Tut3Receiver(); } } @Profile("sender") @Bean public Tut3Sender sender() { return new Tut3Sender(); } }

 

我們遵循與前兩個教程相同的方法。我們建立了三個配置檔案,即教程(“tut3”,“pub-sub”或“publish-subscribe”)。它們都是執行fanout 配置檔案教程的同義詞。

接下來,我們將FanoutExchange配置為bean。

在“接收器”(Tut3Receiver)檔案中,我們定義“四個bean;

  •    兩個autoDeleteQueues或AnonymousQueues
  •    以及兩個繫結來將這些佇列繫結到交換機。

fanout交換非常簡單。正如您可能從名稱中猜到的那樣,它只是將收到的所有訊息廣播到它知道的所有佇列中。而這正是我們傳播資訊所需要的。

列出交換

要列出伺服器上的交換,您可以執行有用的rabbitmqctl

sudo rabbitmqctl list_exchanges

 

在此列表中將有一些amq。*交換和預設(未命名)交換。這些是預設建立的,但目前您不太可能需要使用它們。

Nameless exchange 無名交換

在本教程的前幾部分中,我們對交換一無所知,但仍能夠向佇列傳送訊息。這是可能的,因為我們使用的是預設交換,我們通過空字串(“”)來識別

回想一下我們之前是如何釋出訊息的:

 template.convertAndSend(fanout.getName(),“”,message);

第一個引數是自動裝入發件人的交換的名稱。空字串表示預設或無名交換:訊息被路由到具有routingKey指定名稱的佇列(如果存在)。

現在,我們可以釋出到我們的命名交換:

@Autowired
private RabbitTemplate template;

@Autowired
private FanoutExchange fanout;   // configured in Tut3Config above

template.convertAndSend(fanout.getName(), "", message);

從現在開始,fanout交換會將訊息附加到我們的佇列中。

臨時佇列

您可能還記得以前我們使用過具有特定名稱的佇列(記住你好)。能夠命名佇列對我們來說至關重要 - 我們需要將工作人員指向同一個佇列。

當您想要在生產者和消費者之間共享佇列時,為佇列命名很重要。但我們的粉絲示例並非如此。

我們希望瞭解所有訊息,而不僅僅是它們的一部分。我們也只對目前流動的訊息感興趣,而不是舊訊息。要解決這個問題,我們需要兩件事。

首先,每當我們連線到Rabbit時,我們都需要一個新的空佇列。為此,我們可以使用隨機名稱建立佇列,或者更好 - 讓伺服器為我們選擇隨機佇列名稱。

其次,一旦我們斷開消費者,就應該自動刪除佇列。為了使用spring-amqp客戶端,我們定義了AnonymousQueue,它建立了一個帶有生成名稱的非持久的獨佔自動刪除佇列:

@Bean
public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
    return new AnonymousQueue();
}

此時,我們的佇列名稱包含隨機佇列名稱。例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg

繫結

我們已經建立了一個扇出交換和一個佇列。現在我們需要告訴交換機將訊息傳送到我們的佇列。交換和佇列之間的關係稱為繫結

在上面的Tut3Config中,您可以看到我們有兩個繫結,每個AnonymousQueue一個。

@Bean
public Binding binding1(FanoutExchange fanout,
        Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

列出繫結

您可以使用,您猜對了,列出現有繫結

rabbitmqctl list_bindings

把它們放在一起

發出訊息的生產者程式與前一個教程沒有太大的不同。

最重要的變化是我們現在想要將訊息釋出到我們的扇出交換而不是無名交換。

我們需要在傳送時提供routingKey,但是對於扇出交換,它的值會被忽略這裡是tut3.Sender.java程式的程式碼 

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Tut3Sender {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private FanoutExchange fanout;

    int dots = 0;

    int count = 0;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello");
        if (dots++ == 3) {
            dots = 1;
        }
        for (int i = 0; i < dots; i++) {
            builder.append('.');
        }
        builder.append(Integer.toString(++count));
        String message = builder.toString();
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

如您所見,我們利用Tut3Config檔案中的bean以及RabbitTemplate中的自動裝配以及我們配置的FanoutExchange這一步是必要的,因為禁止釋出到不存在的交換。

如果沒有佇列繫結到交換機,訊息將會丟失,但這對我們沒有問題; 如果沒有消費者在聽,我們可以安全地丟棄該訊息。

消費者

Tut3Receiver.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Tut3Receiver {
    
    @RabbitListener(queues = "#{autoDeleteQueue1.name}")

    public void receive1(String in) throws InterruptedException {
        receive(in, 1);
    }

    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receive2(String in) throws InterruptedException {
        receive(in, 2);
    }

    public void receive(String in, int receiver) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + receiver + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

 檢視用法

java -jar RabbitMQ_0x03_SpringAMQP_PublishSubscribe_Sample-0.0.1-SNAPSHOT.jar

 

這次和之前有所不同,這次消費者和生產者必須同時執行才得行。

消費者和生產者等待時間都是60秒

啟動消費者

java -jar RabbitMQ_0x03_SpringAMQP_PublishSubscribe_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=pub-sub,receiver

 顯示效果如下:

啟動生產者

java -jar RabbitMQ_0x03_SpringAMQP_PublishSubscribe_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=pub-sub,sender

 顯示效果如下: