1. 程式人生 > >springboot-rabbitmq-docker

springboot-rabbitmq-docker

1.安裝rabbitmq

//下載映象,我下載的這個lastest映象沒有web功能,也就是15672埠沒有反應
docker pull rabbitmq
//啟動映象,這裡15672埠沒辦法用,使用的話下載其他映象
 docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq

2.配置springboot

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
spring.rabbitmq.host=192.168.134.132
spring.rabbitmq.username=user
spring.rabbitmq.password=password
spring.rabbitmq.port=5672

3.進行簡單的佇列傳送,路由傳送,topic模式傳送

(1)對指定佇列進行傳送和接收:

package com.example.demo.web;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;

/**
 * @Auther: gaoyang
 * @Date: 2018/12/5 14:13
 * @Description:
 */
@RestController
@Configuration
public class MyWeb {

    @Autowired
    RabbitTemplate rabbitTemplate;

    //註冊一個佇列
    @Bean
    public Queue queue1() {
        return new Queue("myQueue");
    }


     @GetMapping("/b")
    public Object b(){
        rabbitTemplate.convertAndSend("myQueue","測試");
        return "ok";
    }

}

(2)對指定路由傳送資料

    //第二個佇列    
    @Bean
    public Queue queue2() {
        return new Queue("test222");
    }
 
   //註冊一個交換機
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fan");
    }
 
    //將兩個佇列繫結到該路由器   
    @Bean
    public Binding binding3(){
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
    @Bean
    public Binding binding4(){
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }
    //第二個佇列名稱忽略,這樣就會向兩個佇列同時傳送資訊
    @GetMapping("/c")
    public Object c(){
        rabbitTemplate.convertAndSend("fan",null,"測試");
        return "ok";
    }

(3)topic動態交換機名稱傳送

    //註冊一個動態交換機
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("jiaohuan");
    }
    

    //這裡繫結佇列後路由名稱為topic.msg
    @Bean
    public Binding binding(@Qualifier("queue1") Queue queue1, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue1).to(topicExchange).with("topic.msg");
    }

    //這裡的名字使用.#,#代表0到多個字元,*代表一個字元.所以這裡會正則的匹配
    @Bean
    public Binding binding2(@Qualifier("queue2") Queue queue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue2).to(topicExchange).with("topic.#");
    }

//所以這裡傳送的話,路由名稱為topic.msg會像兩個佇列同時傳送,如果為topic.aaa其他等則只會正則的匹配一個佇列;

4.消費類

@Component
public class Consumer {

    @RabbitListener(queues = "test111")
    public void listen(List list){
        System.out.println(list.get(0));
    }
    @RabbitListener(queues = "test222")
    public void listen2(String list){
        System.out.println(list);
    }

}

 

4.ack模式

spring.rabbitmq.host=192.168.134.132
spring.rabbitmq.username=user
spring.rabbitmq.password=password
spring.rabbitmq.port=5672

# 開啟發送確認
spring.rabbitmq.publisher-confirms=true
# 開啟發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啟ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

配置傳送訊息的非同步結果:(該處可以做業務處理,重新發送等),此處是傳送方到mq的結果

@Component
public class MyConfirm implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.out.println("Confirm 訊息傳送失敗" + cause + correlationData.toString());
        } else {
            System.out.println("Confirm 訊息傳送成功 ");
        }
    }
}

mq到接收方的結果:如果失敗可以使用重新發送

@Component
public class MyCallBack implements RabbitTemplate.ReturnCallback {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("sender return " + message.toString()+"==="+i+"==="+s1+"==="+s2);
        rabbitTemplate.send(message);
    }
}

 

 

傳送資訊:

    @GetMapping("/d")
    public Object d() {
        rabbitTemplate.setReturnCallback(myCallBack);
        rabbitTemplate.setConfirmCallback(myConfirm);
        rabbitTemplate.convertAndSend("test333", "測試");
        return "ok";
    }

訊息接收者:

 @RabbitListener(queues = "test333")
    public void listen3(String list, Channel channel, Message message) throws IOException {
        System.out.println(list);
        try {
            int a= 100/0;
            //告訴伺服器收到這條訊息 已經被我消費了 可以在佇列刪掉 這樣以後就不會再發了 否則訊息伺服器以為這條訊息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("receiver success");
        } catch (Exception e) {
            e.printStackTrace();
            //丟棄這條訊息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            System.out.println("receiver fail");
        }
    }

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

以上第一個引數是該條資料的index標識,後面引數是是否全部處理小於當前index值得資訊為接收成功;

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);

以上第一個和第二個引數與上面同理,這裡是是否全部置於接收失敗,第三個引數為是否重新接收資料;

具體的方法含義可以引數這個---連結;