1. 程式人生 > >譯: 2. RabbitMQ Spring AMQP 之 Work Queues

譯: 2. RabbitMQ Spring AMQP 之 Work Queues

在上一篇博文中,我們寫了程式來發送和接受訊息從一個佇列中。

在這篇博文中我們將建立一個工作佇列,用於在多個工作人員之間分配耗時的任務。

Work Queues 工作佇列(又稱:任務佇列背後的主要思想是避免立即執行資源密集型任務,並且必須等待它完成。相反,我們安排任務稍後完成。我們將任務封裝 為訊息並將其傳送到佇列。在後臺執行的工作程序將彈出任務並最終執行作業。當您執行許多工作程式時,它們之間將共享任務。

這個概念在Web應用程式中特別有用,因為在短的HTTP請求視窗中無法處理複雜的任務。

 我們沒有真實的業務場景,因此接下來我們將會用Thread.sleep()方法來模擬一個耗時比較久的任務。

編寫application.properties

我們將在生成的專案中找到application.properties檔案,其中沒有任何內容。

新增application.properties 配置如下:

spring.profiles.active=usage_message
logging.level.org=ERROR
tutorial.client.duration=10000
# 當declareExchange為true時,將持久標誌設定為此值
spring.rabbitmq.durable=true
# PERSISTENT或NON_PERSISTENT確定RabbitMQ是否應該保留訊息
spring.rabbitmq.deliveryMode=PERSISTENT
# 更多屬性設定 
https://docs.spring.io/spring-amqp/reference/htmlsingle/#_common_properties

 

編寫Java配置類

剛才配置檔案中我們配置了一個

tutorial.client.duration=10000

但是這個配置欄位不存在於任何框架jar包裡,因此我們需要編寫一個類來處理這個屬性

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner; import org.springframework.context.ConfigurableApplicationContext; public class RabbitAmqpTutorialsRunner implements CommandLineRunner { @Value("${tutorial.client.duration:0}") private int duration; @Autowired private ConfigurableApplicationContext ctx; @Override public void run(String... args) throws Exception { // TODO Auto-generated method stub System.out.println("Ready ... running for " + duration + "ms"); Thread.sleep(duration); ctx.close(); } }

 

我們仍然和之前教程一樣需要一個Java配置類:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

import com.xingyun.springamqp.business.Tut2Receiver;
import com.xingyun.springamqp.business.Tut2Sender;

@Profile({"tut2", "work-queues"})
@Configuration
public class Tut2Config {

    @Bean
    public Queue hello() {
        return new Queue("hello");
    }

    @Profile("receiver")
    private static class ReceiverConfig {

        @Bean
        public Tut2Receiver receiver1() {
            return new Tut2Receiver(1);
        }

        @Bean
        public Tut2Receiver receiver2() {
            return new Tut2Receiver(2);
        }
    }

    @Profile("sender")
    @Bean
    public Tut2Sender sender() {
        return new Tut2Sender();
    }
}

 

通過上面這個配置類,我們做了四件事

  1. 首先通過 @Profile 註解,定義了 兩個配置檔案字首別名,tut2 或者 work-queues
  2. 通過@Configuration 註解來讓Spring 知道這是一個Java 配置檔案
  3. 定義了 一個佇列,名字叫做hello
  4. 另外定義了兩個配置檔案,一個叫做sender,一個叫做receiver

為什麼要有這兩個配置檔案? 因為我們待會執行生產者和消費者的時候,可以通過動態載入不同的配置檔案來啟動不同的類。

比如我們啟動生產者釋出資訊就可以呼叫這個配置:

--spring.profiles.active=tut2,sender
當我們想啟動消費者就動態呼叫這個配置
--spring.profiles.active=tut2,receiver

接下來我們需要修改下整個應用程式的啟動類:

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.xingyun.springamqp.config.RabbitAmqpTutorialsRunner;

@SpringBootApplication
@EnableScheduling
public class RabbitMq0x02SpringAmqpWorkQueuesSampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMq0x02SpringAmqpWorkQueuesSampleApplication.class, args);
    }
    
    @Profile("usage_message")
    @Bean
    public CommandLineRunner usage() {
        return new CommandLineRunner() {

            @Override
            public void run(String... arg0) throws Exception {
                System.out.println("This app uses Spring Profiles to control its behavior.\n");
                System.out.println("Sample usage: java -jar "
                        + "RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar "
                        + "--spring.profiles.active=work-queues,sender");
            }
        };
    }
    
    @Profile("!usage_message")
    @Bean
    public CommandLineRunner tutorial() {
        return new RabbitAmqpTutorialsRunner();
    }
}

 

當執行這個專案的jar 檔案時會自動載入這個usage_message 配置,列印用法資訊。

我們在啟動類上新增@EnableScheduling,以便於開啟對定時任務的支援.

生產者

我們將修改傳送方以通過在RabbitTemplate上使用相同的方法釋出訊息convertAndSend,以非常人為的方式在訊息中附加一個點來提供識別其是否為更長時間執行任務的方法。該文件將此定義為“將Java物件轉換為Amqp訊息並將其傳送到具有預設路由金鑰的預設交換”。

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Tut2Sender {

        @Autowired
        private RabbitTemplate template;

        @Autowired
        private Queue queue;

        int dots = 0;
        int count = 0;

        @Scheduled(fixedDelay = 1000, initialDelay = 500)
        public void send() {
            StringBuilder builder = new StringBuilder("Hello");
            if (dots++ == 3) {
                dots = 1;
            }
            for (int i = 0; i < dots; i++) {
                builder.append('.');
            }

            builder.append(Integer.toString(++count));
            String message = builder.toString();
            template.convertAndSend(queue.getName(), message);
            System.out.println(" [x] Sent '" + message + "'");
        }
}

消費者

我們的接收器Tut2Receiver模擬doWork()方法中偽造任務的任意長度,其中點數轉換為工作所需的秒數。同樣,我們利用“hello”佇列上的@RabbitListener和@RabbitHandler來接收訊息。消耗該訊息的例項將新增到我們的監視器中,以顯示處理訊息的例項,訊息和時間長度。

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

@RabbitListener(queues = "hello")
public class Tut2Receiver {

    private final int instance;

    public Tut2Receiver(int i) {
        this.instance = i;
    }

    @RabbitHandler
    public void receive(String in) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + this.instance +
            " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + this.instance +
            " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

檢視用法

java -jar RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar

執行生產者

java -jar RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=work-queues,sender

 

執行消費者

java -jar RabbitMQ_0x02_SpringAMQP_WorkQueues_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=work-queues,receiver