1. 程式人生 > >RabbitMQ從在阿里雲安裝到在spring boot中如何使用這一篇就可以了

RabbitMQ從在阿里雲安裝到在spring boot中如何使用這一篇就可以了

//某個服務的具體情況
	ps -ef | grep XXX 
//殺死程序
	kill -9 程序ID,第一個
//檢視記憶體
	free或者top
//檢視磁碟使用情況
	df -l
//尋找檔案
	find -name  xxx
//檢視埠使用情況	
	 netstat -an | grep 15672

RabbitMQ安裝:

1.更新
sudo agt-get update
2. 安裝erlang環境
yum install erlang
3. 安裝rabbitMQ環境
sudo apt-get install rabbitmq-server
4. 啟用RabbitMQWeb管理外掛

不啟用你的http://ip+15672訪問不到

rabbitmq-plugins enable rabbitmq_management
5. 常用命令
啟動、停止、重啟、狀態rabbitMq命令:

啟動:sudo rabbitmq-server start
關閉: sudo rabbitmq-server stop
重啟: sudo rabbitmq-server restart
檢視狀態:sudo rabbitmqctl status

6. 檢視啟動狀態
[email protected]:/# service rabbitmq-server status
● rabbitmq-server.service - RabbitMQ Messaging Server
   Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
   Active: active (running) since Thu 2018-11-08 20:15:25 CST; 7min ago
  Process: 16987 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
  Process: 17074 ExecStartPost=/usr/lib/rabbitmq/bin/rabbitmq-server-wait (code=exited, status=0/SUCCESS)
 Main PID: 17073 (rabbitmq-server)
    Tasks: 70
   Memory: 39.6M
      CPU: 1.538s
   CGroup: /system.slice/rabbitmq-server.service
           ├─17073 /bin/sh /usr/sbin/rabbitmq-server
           ├─17084 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server
           ├─17171 /usr/lib/erlang/erts-7.3/bin/epmd -daemon
           ├─17211 /usr/lib/erlang/erts-7.3/bin/beam -W w -A 64 -P 1048576 -K true -B i -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/ra
           ├─17312 inet_gethost 4
           └─17313 inet_gethost 4

Nov 08 20:15:23 yan systemd[1]: Stopped RabbitMQ Messaging Server.
Nov 08 20:15:23 yan systemd[1]: Starting RabbitMQ Messaging Server...
Nov 08 20:15:23 yan rabbitmq[17074]: Waiting for 
[email protected]
... Nov 08 20:15:23 yan rabbitmq[17074]: pid is 17084 ... Nov 08 20:15:25 yan systemd[1]: Started RabbitMQ Messaging Server. lines 1-22/22 (END)
7 新增使用者

新增admin使用者,密碼設定為admin。

sudo rabbitmqctl add_user  admin  admin

賦予許可權

sudo rabbitmqctl set_user_tags admin administrator 

賦予virtual host中所有資源的配置、寫、讀許可權以便管理其中的資源

sudo rabbitmqctl  set_permissions -p / admin '.*' '.*' '.*'
8. 訪問:



spring boot整合rabbitMQ

1、配置pom包,主要是新增spring-boot-starter-amqp的支援

<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置檔案

配置rabbitmq的安裝地址、埠以及賬戶資訊

  1. 一定不要把埠號設定成15672,因為那個已經被佔用了,所以你新設定一個5672

  2. 同時新增到阿里雲伺服器安全組配置

spring.application.name=Spring-boot-rabbitmq
spring.rabbitmq.host=101.200.55.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=你的密碼

重啟,並開啟http://ip+15672就能看到一個連線資訊了

如圖有一個admin連線上了

3. 新建一個rabbitMQ的包在你的專案下

新建如下幾個檔案

//佇列配置

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }
}
//  傳送者
//  rabbitTemplate是springboot 提供的預設實現

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "這是傳送的資訊 "+"---------------------" + new Date();
        System.out.println("=============================");
        System.out.println("Sender : " + context);
        System.out.println("=============================");
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("------------------1--------------------");
        System.out.println("Receiver  : " + hello);
        System.out.println("-----------------1---------------------");
    }
}
// 新建一個controller測試

import cn.nxcoder.blog.rabbit.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class RabbitController {

    @Autowired
    private HelloSender helloSender;

	/*
        一個生產者和一個消費者
     */
    @PostMapping ("/rabbitHello")
    @ResponseBody
    public void hello(   ) {
        helloSender.send();
    }
}
//用postman本地測試
http://localhost:8088/rabbitHello
//測試結果


=============================
Sender : 這是傳送的資訊 ---------------------Fri Nov 09 15:09:01 CST 2018
=============================
15:09:01.188 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
15:09:01.188 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - Result:null
------------------1--------------------
Receiver  : 這是傳送的資訊 ---------------------Fri Nov 09 15:09:01 CST 2018
-----------------1---------------------


以上最簡單的MQ完成了


B: 一個生產者,多個消費者的情況

B:1

在我門的生產者裡面新家一個sendMsg方法,該方法需要傳參

看下面的,hello不能變,或者說是必須和你的消費者引用的名稱要一樣,這裡我就沒有改

this.rabbitTemplate.convertAndSend("hello", sendMsg);
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "這是傳送的資訊 "+"---------------------" 
        + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

    public void sendMsg(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("hello", sendMsg);
    }

}
B:2

新加一個消費者,但是引用的還是hello

@Component
@RabbitListener(queues = "hello")
public class HelloReceiverTwo {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }
}

再看原來的消費者1,他們的@RabbitListener(queues = “hello”)

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}
B:3

在controller裡面新加一個一對多測試

@Controller
public class RabbitController {

    @Autowired
    private HelloSender helloSender;

    /*
        一個生產者和一個消費者
     */
    @PostMapping ("/rabbitHello")
    @ResponseBody
    public void hello(   ) {
        helloSender.send();
    }


    /**
     * 單生產者-多消費者
     */
    @PostMapping("/oneToMany")
    @ResponseBody
    public void oneToMany() {
        for(int i=0;i<10;i++){
            helloSender.sendMsg("這是第二個生產者傳送的訊息:==="+i+"====個");
        }
    }
}
B:4

postman測試

http://localhost:8088/oneToMany

B:5

測試結果:

Sender2 : 這是第二個生產者傳送的訊息:===0====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===1====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===2====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===3====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===4====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===5====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===6====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===7====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===8====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===9====個Fri Nov 09 15:33:33 CST 2018
15:33:33.224 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
15:33:33.224 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - Result:null
Receiver2  : 這是第二個生產者傳送的訊息:===1====個Fri Nov 09 15:33:33 CST 2018
Receiver  : 這是第二個生產者傳送的訊息:===0====個Fri Nov 09 15:33:33 CST 2018
Receiver2  : 這是第二個生產者傳送的訊息:===3====個Fri Nov 09 15:33:33 CST 2018
Receiver  : 這是第二個生產者傳送的訊息:===2====個Fri Nov 09 15:33:33 CST 2018
Receiver2  : 這是第二個生產者傳送的訊息:===5====個Fri Nov 09 15:33:33 CST 2018
Receiver  : 這是第二個生產者傳送的訊息:===4====個Fri Nov 09 15:33:33 CST 2018
Receiver2  : 這是第二個生產者傳送的訊息:===7====個Fri Nov 09 15:33:33 CST 2018
Receiver  : 這是第二個生產者傳送的訊息:===6====個Fri Nov 09 15:33:33 CST 2018
Receiver2  : 這是第二個生產者傳送的訊息:===9====個Fri Nov 09 15:33:33 CST 2018
Receiver  : 這是第二個生產者傳送的訊息:===8====個Fri Nov 09 15:33:33 CST 2018

以上一對多的就完成了


C:多個消費者和多個生產者
  1. 在剛才我們實現了一個生產者和2個消費者

  2. 其實多對多更簡單,只需要基於一對多把生產者copy一份就可以了,裡面的東西不要變,換個名字

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSenderTwo {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "這是傳送的資訊 "+"
        ---------------------" + new Date();
        System.out.println("生產者2_Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

    public void sendMsg(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("生產者2_Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("hello", sendMsg);
    }

}

controller新加一個方法

import cn.nxcoder.blog.rabbit.HelloSender;
import cn.nxcoder.blog.rabbit.HelloSenderTwo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class RabbitController {

    @Autowired
    private HelloSender helloSender;
    @Autowired
    private HelloSenderTwo helloSenderTwo;

    /*
        一個生產者和一個消費者
     */
    @PostMapping ("/rabbitHello")
    @ResponseBody
    public void hello(   ) {
        helloSender.send();
    }


    /**
     * 單生產者-多消費者
     */
    @PostMapping("/oneToMany")
    @ResponseBody
    public void oneToMany() {
        for(int i=0;i<10;i++){
            helloSender.sendMsg("這是第二個生產者傳送的訊息:==="+i+"====個");
        }
    }


    /**
     * 多生產者-多消費者
     */
    @PostMapping("/manyToMany")
    @ResponseBody
    public void manyToMany() {
        for(int i=0;i<10;i++){
            helloSender.sendMsg("hellomsg:"+i+"    ");
            helloSenderTwo.sendMsg("hellomsg:"+i+"      ");
        }

    }
}
	// postman測試
http://localhost:8088/manyToMany
//測試結果
Sender2 : hellomsg:0    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:0      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:1    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:1      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:2    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:2      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:3    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:3      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:4    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:4      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:5    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:5      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:6    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:6      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:7    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:7      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:8    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:8      Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:9    Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:9      Fri Nov 09 15:49:42 CST 2018
15:49:42.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
15:49:42.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - Result:null
Receiver  : hellomsg:0    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:0      Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:1      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:1    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:2      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:2    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:3      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:3    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:4      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:4    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:5      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:5    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:6      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:6    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:7      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:7    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:8      Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:8    Fri Nov 09 15:49:42 CST 2018
Receiver  : hellomsg:9    Fri Nov 09 15:49:42 CST 2018
Receiver2  : hellomsg:9      Fri Nov 09 15:49:42 CST 2018

C: 用實體類傳送訊息佇列

大部分的情況是資料是用物件封裝的,所以我們來測試一下實體類

C.1 新建一個實體類並實現序列化介面(必須)

springboot完美的支援物件的傳送和接收,不需要格外的配置。

實體類(必須實現序列化介面):

public class RabbitTest implements Serializable {

    private String name;
    private String pass;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getPass() {
        return pass;
    }
    public void setPass(String pass) {
        this.pass = pass;
    }
}

C.2 在我們的生產者裡面新建一個方法

需要更改一下他的名字

this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest);

原來的生產者變成這樣;

import cn.nxcoder.blog.entity.RabbitTest;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "這是傳送的資訊 "+"---------------------" + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

    public void sendMsg(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("hello", sendMsg);
    }

    public void sendEntity() {
        RabbitTest rabbitTest =new RabbitTest();
        rabbitTest.setName("琬琬");
        rabbitTest.setPass("123456987");
        this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest);
    }
}
C3 新建一個消費者指定他的名字為entityQueue

必須新建一個消費者,因為一個消費者只能有一個名字,剛才我們新家的消費者名字都是hello

現在我們給他定義一個新的名字entityQueue

import cn.nxcoder.blog.entity.RabbitTest;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "entityQueue")
public class EntityReceiver {
    @RabbitHandler
    public void process(RabbitTest rabbitTest) {
        System.out.println("rabbitTest receive  : " + 
        rabbitTest.getName()+"/"+rabbitTest.getPass());
    }
}

注意:這樣繼續下去時會報錯的

因為你一旦新定義一個名字,就必須往config檔案中新增這個名字

現在我們的配置類多了一個entityQueue

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }

    @Bean
    public Queue entityQueue() {
        return new Queue("entityQueue");
    }
}

C4 controller測試

  /**
     * 實體類傳輸測試
     */
    @PostMapping("/entityTest")
    @ResponseBody
    public void userTest() {
        helloSender.sendEntity();
    }

C5 postman測試


//測試結果
rabbitTest receive  : 琬琬/123456987


topic ExChange 示例

  1. topic 是RabbitMQ中最靈活的一種方式,可以根據binding_key自由的繫結不同的佇列

  2. 首先對topic規則配置,這裡使用兩個佇列來測試

  3. (也就是在Application類中建立和繫結的topic.message1和topic.message2兩個佇列)

    其中topic.message的bindting_key為

    “topic.message1”,topic.message2的binding_key為“topic.#”;

1. D 現在我們的config檔案中新增如下記錄

    //===============以下是驗證topic Exchange的佇列==========
    @Bean
    public Queue queueMessage() {
        return new Queue("topic.message1");
    }

    @Bean
    public Queue queueMessages() {
        return new Queue("topic.message2");
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    /**
     * 將佇列topic.messages與exchange繫結,binding_key為topic.#,模糊匹配
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

2. D 在我們的生產者裡面新增如下方法:


3. D 新增兩個消費者

@Component
@RabbitListener(queues = "topic.message1")
public class topicMessageReceiver {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver1  : " +msg);
    }

}
@Component
@RabbitListener(queues = "topic.message2")
public class topicMessageReceiverTwo {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver2  : " +msg);
    }

}

4. D controller測試

 /**
     * topic exchange型別rabbitmq測試
     */
    @PostMapping("/topicTest")
    @ResponseBody
    public void topicTest() {
        helloSender.sendTopic();
    }
測試:
http://localhost:8088/topicTest
//結果

sender1 : I am topic.mesaage msg======
sender2 : I am topic.mesaages msg########
16:50:10.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
16:50:10.455 [http-nio-8088-exec-2] INFO  c.n.blog.handler.LogAspectHandler - Result:null
topic.messageReceiver2  : I am topic.mesaage msg======
topic.messageReceiver2  : I am topic.mesaages msg########

6、fanout ExChange示例

Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout轉發器傳送訊息,綁定了這個轉發器的所有佇列都收到這個訊息。

這裡使用三個佇列來測試(也就是在config類中建立和繫結的fanout.A、fanout.B、fanout.C)這三個佇列都和config中建立的fanoutExchange轉發器繫結。

6.1 新增config檔案

    //===============以下是驗證Fanout Exchange的佇列==========
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }
    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }
    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
    //===============以上是驗證Fanout Exchange的佇列==========

6.2 新增生產者方法

 public void sendFanout() {
        String msgString="fanoutSender :hello i am hzb";
        System.out.println(msgString);
        this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString);
    }
6.3 新增三個消費者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }

}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }

}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }

}

6.4 controller方法

/**
     * fanout exchange型別rabbitmq測試
     */
    @PostMapping("/fanoutTest")
    @ResponseBody
    public void fanoutTest() {
        helloSender.sendFanout();
    }
6.5 測試
http://localhost:8088/fanoutTest
//結果
fanoutSender :hello i am hzb

17:35:14.911 [http-nio-8088-exec-1] INFO  c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
17:35:14.911 [http-nio-8088-exec-1] INFO  
c.n.blog.handler.LogAspectHandler - Result:null

FanoutReceiverB  : fanoutSender :hello i am hzb
FanoutReceiverA  : fanoutSender :hello i am hzb
FanoutReceiverC  : fanoutSender :hello i am hzb

6.6 結果分析:

由以上結果可知:就算fanoutSender傳送訊息的時候,指定了routing_key為"abcd.ee",但是所有接收者都接受到了訊息


7、帶callback的訊息傳送

增加回調處理,這裡不再使用application.properties預設配置的方式,會在程式中顯示的使用檔案中的配置資訊。該示例中沒有新建佇列和exchange,用的是第5節中的topic.messages佇列和exchange轉發器。消費者也是第5節中的topicMessagesReceiver

7.1 在application.properties中新增一些資訊
新增;
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
//現在變為:
spring.application.name=Spring-boot-rabbitmq
spring.rabbitmq.host=101.200.55.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=你的密碼
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
7.2 新增config檔案
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;

public class RabbitConfig2 {

    @Value("${spring.rabbitmq.host}")
    private String addresses;

    @Value("${spring.rabbitmq.port}")
    private String port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean publisherConfirms;

    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses+":"+port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        /** 如果要進行訊息回撥,則這裡必須要設定為true */
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }

    @Bean
    /** 因為要設定回撥類,所以應是prototype型別,如果是singleton型別,則回撥類為最後一次設定 */
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplatenew() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

}

7.3 新增生產者類:

import java.util.Date;
import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CallBackSender implements  RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplatenew;
    public void send() {

        rabbitTemplatenew.setConfirmCallback(this);
        String msg="callbackSender : i am callback sender";
        System.out.println(msg );
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("callbackSender UUID: " + correlationData.getId());
        this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // TODO Auto-generated method stub
        System.out.println("callbakck confirm: " + correlationData.getId());
    }
}

7.4 用原來topic的消費者類,這裡再貼一次

@Component
@RabbitListener(queues = "topic.message1")
public class topicMessageReceiver {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver1  : " +msg);
    }

}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.message2")
public class topicMessageReceiverTwo {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topic.messageReceiver2  : " +msg);
    }

}

7.5 controller測試

@PostMapping("/callback")
    @ResponseBody
    public void callbak() {
        callBackSender.send();
    }
http://localhost:8088/callback
//測試結果
callbackSender : i am callback sender
callbackSender UUID: 48be7d7e-69f8-4d9c-b264-191402dec3de
 
callbakck confirm: 48be7d7e-69f8-4d9c-b264-191402dec3de
topic.messageReceiver2  : callbackSender : i am callback sender

7.6 結果分析

從上面可以看出callbackSender發出的UUID,收到了迴應,又傳回來了。


到此,rabbitMQ先分析到這裡,接下來我們會用它做些高階的功能

有問題的可以聯絡

[email protected]