1. 程式人生 > >使用rabbitmq訊息中介軟體

使用rabbitmq訊息中介軟體

###rabbitmq介紹:

RabbitMQ是一個在AMQP基礎上完整的,可複用的企業訊息系統。它可以用於大型軟體系統各個模組之間的高效通訊,支援高併發,支援可擴充套件。

###amqp介紹:

即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品,不同的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。

##思路

基於spring boot的特性連線rabbitmq,並作出如下樣例:

  • 配置

  • 釋出方樣例

  • 消費方樣例

##實現

###1.配置

引入maven依賴

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>
spring-boot-starter-amqp</artifactId> </dependency>

編寫config配置類(預設情況下是不用做任何配置的,這裡有配置是因為,它預設是用的二進位制做的訊息傳輸,這裡的配置是改為json傳輸)

@Configuration
public class RabbitMqConfig {

  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactoryPlus(
      SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
      Jackson2JsonMessageConverter jackson2JsonMessageConverter)
{ rabbitListenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter); return rabbitListenerContainerFactory; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper xssObjectMapper) { return new Jackson2JsonMessageConverter(xssObjectMapper); } }

編寫配置檔案

spring.rabbitmq.host=192.168.134.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=dev_udf-sample
spring.rabbitmq.password=1qazxsw2
spring.rabbitmq.virtual-host=/dev_udf-sample
spring.rabbitmq.template.retry.enabled=true #傳送方是否重試
spring.rabbitmq.listener.retry.enabled=true #消費方是否重試

定義公共的訊息類

public classRabbitmqMessage<T> implementsSerializable{
  private static final long serialVersionUID = 1L;
  //訊息ID
  private String id;
  ....其他自定義
}

###2.釋出方樣例

建立Exchange,這裡使用的是DirectExchange,exchange主要是定義路由規則的,還有其他不同的路由規則實現,如:TopicExchange,他們都繼承至AbstractExchange

  @Bean
  public DirectExchange testExchange(){
    return new DirectExchange("test_exchange");
  }

使用AmqpTemplate傳送非同步訊息(RoutingKey則是指定訊息的路由鍵,不同的路由鍵可被不同的消費方消費)

  @Autowired
  private AmqpTemplate amqpTemplate;
  
  //然後呼叫傳送方法傳送訊息
  this.amqpTemplate.convertAndSend("test_exchange", "testRoutingKey", new RabbitmqMessage<String>("test"));

###3.消費方樣例

建立消費佇列,死信佇列,以及與exchange的繫結關係

  //消費佇列
  @Bean
  public Queue testConsume(){
    //死信exchange與上面的定義方式一樣
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange","test_exchange_dlx");
    args.put("x-dead-letter-routing-key","testRoutingKey_dlx");
    return new Queue("test_consume", true, false, false, args);
  }
  
  //死信消費佇列
  @Bean
  public Queue testConsumeDlx(){
    return new Queue("test_consume_dlx");
  }
  
  //消費佇列繫結
  @Bean
  public Binding testConsumeBinding(){
    return new Binding("test_consume", DestinationType.QUEUE,
        "test_exchange","testRoutingKey", null);
  }
  
  //死信消費佇列繫結
  @Bean
  public Binding testConsumeDlxBinding(){
    return new Binding("test_consume_dlx", DestinationType.QUEUE,
        "test_exchange_dlx","testRoutingKey_dlx", null);
  }

消費訊息

  @RabbitListener(queues = "test_consume")
  publicvoidprocess(Message<String> message){
    log.info(message);
  }

##結束