1. 程式人生 > >RabbitMQ學習(一)

RabbitMQ學習(一)

前言:

一、    認識RabbitMQ

二、    Spring boot + RabbitMQ(初體驗)

三、    Spring boot 整合RabbitMQ實現三種訊息傳遞方式

四、    RabbitMQ高階

五、    RPC-RabbitMQ的使用

一、 認識RabbitMQ(搬運)

參考:


|簡介

        RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民群眾的需求,雖然在同步訊息通訊的世界裡有很多公開標準(如 COBAR的 IIOP ,或者是SOAP 等),但是在非同步訊息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了AMQP 的公開標準。

   RabbitMQ是由RabbitMQ Technologies Ltd開發並且提供商業支援的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被併入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不同的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現在並沒有上市。

|應用場景

RabbitMQ,或者說AMQP解決了什麼問題,或者說它的應用場景是什麼?

對於一個大型的軟體系統來說,它會有很多的元件或者說模組或者說子系統或者(subsystemor Component or submodule

)。那麼這些模組的如何通訊?這和傳統的IPC有很大的區別。傳統的IPC很多都是在單一系統上的,模組耦合性很大,不適合擴充套件(Scalability);如果使用socket那麼不同的模組的確可以部署到不同的機器上,但是還是有很多問題需要解決。比如:

 1)資訊的傳送者和接收者如何維持這個連線,如果一方的連線中斷,這期間的資料如何方式丟失?

 2)如何降低傳送者和接收者的耦合度?

 3)如何讓Priority高的接收者先接到資料?

 4)如何做到load balance?有效均衡接收者的負載?

 5)如何有效的將資料傳送到相關的接收者?也就是說將接收者subscribe不同的資料,如何做有效的filter。

 6)如何做到可擴充套件,甚至將這個通訊模組發到cluster上?

 7)如何保證接收者接收到了完整,正確的資料?

  AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。

|架構

系統架構圖版權屬於sunjun041640

RabbitMQ Server: 也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQisn’t afood truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對於普通的應用來說這已經足夠了。當然對於商業系統來說,可以再做一層資料一致性的guard,就可以徹底保證系統的一致性了。

Client A & B: 也叫Producer,資料的傳送方。createmessages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(標籤)。payload顧名思義就是傳輸的資料。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

Client 1,2,3:也叫Consumer,資料的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它傳送給它的某個訂閱者即Consumer。當然可能會把同一個Message傳送給很多的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來說,它是不知道誰傳送的這個資訊的。就是協議本身不支援。但是當然瞭如果Producer傳送的payload包含了Producer的資訊就另當別論了。

    對於一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queuesand bindings。

Exchangesare whereproducers publish their messages.

Queues are where the messages end up and are received by consumers

Bindings are how the messages get routed from the exchange toparticular queues.

還有幾個概念是上述圖中沒有標明的,那就是Connection(連線),Channel(通道,頻道)。

Connection: 就是一個TCP的連線。Producer和Consumer都是通過TCP連線到RabbitMQ Server的。以後我們可以看到,程式的起始處就是建立這個TCP連線。

Channels: 虛擬連線。它建立在上述的TCP連線中。資料流動都是在Channel中進行的。也就是說,一般情況是程式起始建立TCP連線,第二步就是建立這個Channel。

那麼,為什麼使用Channel,而不是直接使用TCP連線?

對於OS來說,建立和關閉TCP連線是有代價的,頻繁的建立關閉TCP連線對於系統的效能有很大的影響,而且TCP的連線數也有限制,這也限制了系統處理高併發的能力。但是,在TCP連線中建立Channel是沒有上述代價的。對於Producer或者Consumer來說,可以併發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的資料可以Publish10K的資料包。當然對於不同的硬體環境,不同的資料包大小這個資料肯定不一樣,但是我只想說明,對於普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。

|進一步

      |使用ack確認Message的正確傳遞

預設情況下,如果Message 已經被某個Consumer正確的接收到了,那麼該Message就會被從queue中移除。當然也可以讓同一個Message傳送到很多的Consumer。

如果一個queue沒被任何的ConsumerSubscribe(訂閱),那麼,如果這個queue有資料到達,那麼這個資料會被cache,不會被丟棄。當有Consumer時,這個資料會被立即傳送到這個Consumer,這個資料被Consumer正確收到時,這個資料就被從queue中刪除。

那麼什麼是正確收到呢?通過ack。每個Message都要被acknowledged(確認,ack)。我們可以顯示的在程式中去ack,也可以自動的ack。如果有資料沒有被ack,那麼:

    RabbitMQ Server會把這個資訊傳送到下一個Consumer

如果這個app有bug,忘記了ack,那麼RabbitMQ Server不會再發送資料給它,因為Server認為這個Consumer處理能力有限。

而且ack的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成資料後傳送ack,甚至在額外的延時後傳送ack,將有效的balance Consumer的load。

當然對於實際的例子,比如我們可能會對某些資料進行merge,比如merge 4s內的資料,然後sleep 4s後再獲取資料。特別是在監聽系統的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端使用者也不會感覺到。

      |Reject amessage

有兩種方式,第一種的Reject可以讓RabbitMQ Server將該Message 傳送到下一個Consumer。第二種是從queue中立即刪除該Message。

|Creating a queue

Consumer和Procuder都可以通過 queue.declare 建立queue。對於某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以建立私有的queue。這樣只有app本身才可以使用這個queue。queue也可以自動刪除,被標為auto-delete的queue在最後一個Consumer unsubscribe後就會被自動刪除。那麼如果是建立一個已經存在的queue呢?那麼不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次建立如果引數和第一次不一樣,那麼該操作雖然成功,但是queue的屬性並不會被修改。

那麼誰應該負責建立這個queue呢?是Consumer,還是Producer?

如果queue不存在,當然Consumer不會得到任何的Message。但是如果queue不存在,那麼Producer Publish的Message會被丟棄。所以,還是為了資料不丟失,Consumer和Producer都tryto create the queue!反正不管怎麼樣,這個介面都不會出問題。

  queue對load balance的處理是完美的。對於多個Consumer來說,RabbitMQ 使用迴圈的方式(round-robin)的方式均衡的傳送給不同的Consumer。

      |Exchanges

從架構圖可以看出,Procuder Publish的Message進入了Exchange。接著通過“routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue裡。queue也是通過這個routing keys來做的繫結。

有三種類型的Exchanges:direct,fanout,topic。 每個實現了不同的路由演算法(routing algorithm)。

·        Direct exchange:如果 routing key 匹配, 那麼Message就會被傳遞到相應的queue中。其實在queue建立時,它會自動的以queue的名字作為routing key來繫結那個exchange。

·        Fanout exchange:會向響應的queue廣播。

·        Topic exchange:對key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。

      |Virtual hosts

每個virtual host本質上都是一個RabbitMQ Server,擁有它自己的queue,exchagne,和bingsrule等等。這保證了你可以在多個不同的application中使用RabbitMQ

二、Spring boot+ RabbitMQ(初體驗)

idea

maven3.x

1、安裝rabbitMQ

windws下

下載後即可安裝,安裝過程可能需要前置安裝Erlang,根據提示安裝即可。

這裡將一些比較重要的地方進行重點描述,加深印象(以下為搬運)

rabbitmq使用者角色可分為五類:超級管理員, 監控者, 策略制定者, 普通管理者以及其他

 (1) 超級管理員(administrator)

可登陸管理控制檯(啟用management plugin的情況下),可檢視所有的資訊,並且可以對使用者,策略(policy)進行操作。

(2) 監控者(monitoring)

可登陸管理控制檯(啟用management plugin的情況下),同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁碟使用情況等) 

(3) 策略制定者(policymaker)

可登陸管理控制檯(啟用management plugin的情況下), 同時可以對policy進行管理。

(4) 普通管理者(management)

僅可登陸管理控制檯(啟用management plugin的情況下),無法看到節點資訊,也無法對策略進行管理。

(5) 其他的:

無法登陸管理控制檯,通常就是普通的生產者和消費者。

相關命令:

檢視已使用者及使用者的角色:rabbitmqctl.bat list_users

新增使用者:rabbitmqctl.bat add_user username password

修改使用者角色:rabbitmqctl.bat set_user_tags username administrator

多角色授權:rabbitmqctl.bat  set_user_tags  username tag1 tag2 ...

修改密碼:rabbitmqctl change_password userName newPassword

刪除使用者:rabbitmqctl.bat delete_user username

許可權設定:

使用者許可權指的是使用者對exchange,queue的操作許可權,包括配置許可權,讀寫許可權。我們配置許可權會影響到exchange、queue的宣告和刪除。讀寫許可權影響到從queue裡取訊息、向exchange傳送訊息以及queue和exchange的繫結(binding)操作。

許可權相關命令為:

(1) 設定使用者許可權

rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP

(2) 檢視(指定hostpath)所有使用者的許可權資訊

rabbitmqctl  list_permissions  [-p  VHostPath]

(3) 檢視指定使用者的許可權資訊

rabbitmqctl  list_user_permissions  User

(4)  清除使用者的許可權資訊

rabbitmqctl  clear_permissions  [-p VHostPath]  User

Linux(centOS7暫未測試,後續補充)

2、spring boot整合rabbitMQ

Maven依賴

配置yml檔案

參考:

測試一:

建立訊息接收者

import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
/**
 * author : jimmyLJM on 2017-11-24
 *
建立訊息接收者
 
*/
@Component
public class MyReciver {
   
/*  自動注入CountDownLatch類告訴傳送者訊息已經收到了即用來act時候使用
       
不需要在應用程式中具體實現它,只需要latch.countDown()就行了
     */
   
private CountDownLatch latch = new CountDownLatch(1);

   
public void receiveMessage(String message) {
       
System.out.println("TestRecive <" + message + ">");
       
latch.countDown();
    }
   
public CountDownLatch getLatch() {
       
return latch;
    }
}

建立訊息生產者

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 *
 * author : jimmyLJM on 2017-11-24
 */
@Component
public class MySender {
   
// 定義一個訊息佇列的名字用於測試使用
   
final static String queueName = "spring-boot";

   
// 建立Queue
   
@Bean
   
Queue queue() {
       
return new Queue(queueName, false);
    }

   
// 定義一個topic交換器
   
@Bean
   
TopicExchange exchange() {
       
return new TopicExchange("spring-boot-exchange");
    }

   
// 將訊息佇列queueexchange繫結
   
@Bean
   
Binding binding(Queue queue, TopicExchange exchange) {
       
return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

   
// 建立訊息監聽的容器
   
@Bean
   
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                            
MessageListenerAdapter listenerAdapter) {
       
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
       
container.setConnectionFactory(connectionFactory);
       
container.setQueueNames(queueName);
       
container.setMessageListener(listenerAdapter);
       
return container;
    }

   
@Bean
   
MessageListenerAdapter listenerAdapter(MyReciver receiver) {
        
return new MessageListenerAdapter(receiver, "receiveMessage");
    }

}

測試類:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 *  RabbitMQ
測試類
 
* author : jimmyLJM on 2017-11-24
 *
 *
專案服務啟動的時候就去載入一些資料或做一些事情這樣的需求
 
* 實現介面 CommandLineRunner 來實現重寫run方法
 
*/
@Component
public class TestMQ implements CommandLineRunner{

   
private final RabbitTemplate rabbitTemplate;
   
private final MyReciver receiver;
   
private final ConfigurableApplicationContext context;

   
public TestMQ(MyReciver receiver, RabbitTemplate rabbitTemplate,
                  
ConfigurableApplicationContext context) {
       
this.receiver = receiver;
       
this.rabbitTemplate = rabbitTemplate;
       
this.context = context;
    }
   
@Override
   
public void run(String... args) throws Exception {
       
System.out.println("Sending message...");
       
rabbitTemplate.convertAndSend("spring-boot", "Hello from RabbitMQ!");
       
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
       
context.close();
    }

}

啟動spring boot後報錯,錯誤提示如下:

2017-11-24 10:23:22.802  INFO 8756 --- [  restartedMain] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147482648

2017-11-24 10:23:22.802  INFO 8756 --- [  restartedMain] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647

2017-11-24 10:23:22.802  INFO 8756 --- [  restartedMain] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed

2017-11-24 10:23:22.819  INFO 8756 --- [  restartedMain] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)

2017-11-24 10:23:22.829  INFO 8756 --- [  restartedMain] s.d.s.w.s.ApiListingReferenceScanner     : Scanning for api listing references

2017-11-24 10:23:22.938  INFO 8756 --- [  restartedMain] .d.s.w.r.o.CachingOperationNameGenerator : Generating unique operation named: indexUsingGET_1

2017-11-24 10:23:23.078  INFO 8756 --- [    container-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#188cffa:0/[email protected] [delegate=amqp://[email protected]:5672/, localPort= 62367]

2017-11-24 10:23:23.132  WARN 8756 --- [    container-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:spring-boot

2017-11-24 10:23:23.138  WARN 8756 --- [    container-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[spring-boot]

         at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:643) ~[spring-rabbit-1.7.3.RELEASE.jar:na]

         at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:542) ~[spring-rabbit-1.7.3.RELEASE.jar:na]

         at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1389) [spring-rabbit-1.7.3.RELEASE.jar:na]

         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

Caused by: java.io.IOException: null

         at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50) ~[amqp-client-4.0.2.jar:4.0.2]

         at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.queueDeclarePassive(PublisherCallbackChannelImpl.java:335) ~[spring-rabbit-1.7.3.RELEASE.jar:na]

         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111]

         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111]

         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111]

         at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111]

         at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955) ~[spring-rabbit-1.7.3.RELEASE.jar:na]

         at com.sun.proxy.$Proxy125.queueDeclarePassive(Unknown Source) ~[na:na]

         at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:622) ~[spring-rabbit-1.7.3.RELEASE.jar:na]

         ... 3 common frames omitted

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-boot' in vhost '/', class-id=50, method-id=10)

         at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.2.jar:4.0.2]

         ... 13 common frames omitted

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-boot' in vhost '/', class-id=50, method-id=10)

         at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-4.0.2.jar:4.0.2]

         at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572) ~[amqp-client-4.0.2.jar:4.0.2]

         ... 1 common frames omitted

在如圖的地方新增名字(queue)然後點【擊Add queue】按鈕

重新啟動spring boot後可以看到控制檯顯示如下:

測試二

建立訊息接收者

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 *
訊息佇列接收類
 
* author : jimmyLJM on 2017-11-24
 *
使用 RabbitListener 註解使接收者監聽名稱為helloqueue
 *     RabbitHandler
對接收到的訊息進行處理
 
*/
@Component
@RabbitListener
(queues = "hello")
public class DemoRabbitMqReceiver {
   
@RabbitHandler
   
public void process(String content) {
       
System.out.println("DemoRabbitMqReceiver : " + content);
    }

建立訊息傳送者:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * author : jimmyLJM on 2017-11-24
 */
@Component
public class DemoRabbitMqSender {
   
@Autowired
   
private AmqpTemplate rabbitTemplate;

   
public void send(String content) {
       
System.out.println("Sender : " + content);
       
this.rabbitTemplate.convertAndSend("hello", content);
    }
}

測試類:

import com.jimmy.springBootDemo.MQ.DemoRabbitMqSender;
import com.jimmy.springBootDemo.util.ResultUtil;
import com.jimmy.springBootDemo.vo.Result;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpSession;

/**
 * author : jimmyLJM on 2017-11-24
 *
測試Rabbit MQ
 */
@RestController
@RequestMapping
(value="/rabbitmq")
public class DemoRabbitMqController {
   
@Autowired
   
private DemoRabbitMqSender demoRabbitMqSender;
   
/**
     *
傳送測試訊息佇列
    
*/
   
@ApiOperation(value="傳送測試訊息佇列", notes="addEntity")
   
@RequestMapping(value = "/addRabbitMq", method = RequestMethod.GET)
   
public Result addEntity(HttpSession httpSession) {
       
demoRabbitMqSender.send("rabbitMQ demo test...");
       
return ResultUtil.success();
    }
}

使用swagger測試

測試後控制檯列印:

比較測試一(A)和測試二(B):

A :使用RabbitTemplate的convertAndSend()方法來進行傳遞訊息

通過訊息監聽容器SimpleMessageListenerContainer載入監聽介面卡

MessageListenerAdapter(繫結接收者的對應方法),設定監聽的queue

使用BindingBuilder將queue和exchange繫結

B:藉助註解來使訊息接受者主動監聽生產者傳送的訊息

@RabbitListener

@RabbitHandler

訊息生產者借用AmqpTemplate類注入的例項(RabbitTemplate)來發送訊息

測試的時候直接注入生產者呼叫send方法將訊息傳送出去

AmqpTemplate和RabbitTemplate之間存在以下關係

這樣我們就實現了spring boot呼叫 RabbitMQ實現了訊息的傳送和接受。



相關推薦

訊息中介軟體--RabbitMQ學習

Activemq介紹 Activemq是 Apache出品,最流行的能力強勁的開源訊息匯流排,並且它個完全支援MS規範的訊息中介軟體。 其豐富的AP、多種叢集構建模式使得他成為業界老牌訊息中介軟體,在中小型企業中應用廣泛。 MQ衡量指標:服務效能、資料儲存、叢集架構

RabbitMQ學習-RabbitMQ安裝

一、rabbitmq安裝過程: cd ~/Downloads/ mkdir rabbitmq cd rabbitmq/ wget http://www.rabbitmq.com/releases/r

rabbitMQ 學習介紹

什麼是RabbitMQ 它是一個基於AMQP協議的開源的訊息中介軟體,通過普通的協議,在完全不同的應用之間實現資料共享,也就是說,支援跨平臺,跨語言,就可以用於java ,也可以使用者.net,php; 為什麼要使用Rabbit rabbitmq有許多有點,在不同的應用場景之下,額可以展

RabbitMQ學習

前言: 一、    認識RabbitMQ 二、    Spring boot + RabbitMQ(初體驗) 三、    Spring boot 整合RabbitMQ實現三種訊息傳遞方式 四、    RabbitMQ高階 五、    RPC-RabbitMQ的使用 一

rabbitmq系統學習

各種mq activemq,kafka使用zookeeper做管理 rocketmq自己實現nameserver broke管理 AMQP核心概念 高階訊息佇列協議 publisher application->Server->Virtual host->Exc

Node個人學習----模塊

需要 區別 class 當前 個人 一個 min export ava 1、自定義模塊與系統模塊的引入方式區別:----自定義模塊需要加“./”來聲明它不是一個系統模塊 const mod1=require("系統模塊.js"); const mod1=require(

MySQL學習ODBC 安裝

oca local new cmd 密碼 服務 關閉 mysql 系統 寫前說明 初次接觸MySQL,都說MySQL各種好。我也來學習學習。之前只有SQLServer2008的經驗,其實也只是皮毛。因為SQLServer還是比較容易上手的。也不麻煩。但是自己

vue基礎學習

time tle eight pla use logs new dial for 01-01 vue使用雛形      <div id="box"> {{msg}} </div> <sc

java學習 環境搭建、hello world的demo

環境變量 網上 類庫 .com java開發 www cnblogs rgs .class   本程序媛搞前端的,上班偶有空閑,不妨來學習學習,不然怎麽包養小白臉,走上人生巔峰?   說實話,每個語言都相通,有了javascript的基礎,並且有了兩三年跟java打交道的經

【Cloud Foundry】Could Foundry學習——Could Foundry淺談

art lock mod out isp ted 組成 .com pop 在閱讀的過程中有不論什麽問題。歡迎一起交流 郵箱:[email protected]/* */ QQ:1494713801 Cloud Foundry是VMware

Android中關於JNI 的學習對於JNIEnv的一些認識

else size 初步 jint 使用 包括 pri jnienv 就會 一個簡單的樣例讓我們初步地了解JNI的作用,可是關於JNI中的一些概念還是須要了解清楚,才可以更好的去利用它來實現我們想要做的事情。 那麽C++和Java之間的是怎樣通過JNI來進行互相調用的呢

.Net MVC的學習

後綴 fonts mvc 存在 eth 大致 例如 pre gb2   套種間作,也挺有意思的——近來學習感悟。DRP學習的同一時候,折騰了點曾經不曾學習可是卻非常多次耳聞過的東西——Asp.Net中的MVC架構模式。 一、是什麽?      MVC,即(Model-

Python學習

ubun 安裝目錄 tle setup extract reat 常用插件 增加 網址 Python學習(一) 一:開發工具安裝   1,pycharm下載:Linux版本   2,破解:在help-register下-service輸入:      http://elpo

Web Service學習

內存 res 特性 response 屬性表 高性能 fas str ima 1、WebMethod特性包含哪些屬性,都有什麽用? 1、BufferResponse屬性 該屬性表明是否啟用對Web Service方法響應的緩沖。當設置為true時,Web Service

Python基礎學習

func exe function 學習 typeerror exit invalid min eas #Default Argument Values & in keyworddef ask_ok(prompt, retres=4, reminder=‘pleas

RabbitMq 實戰

rabbitmq spring boot (消費者處理消息)RabbitMq消息消費者服務 開發工具Idea和Spring boot來開發的。消息消費目前只是一個簡單的Demo,後續會處理成更智能一些。首先配置文件類,RabbitMqConfig,裏面配置一些用戶名和密碼嗨喲隊列信息。package com.

Django學習---基本配置及創建項目、應用

cut 維護 onf response settings 通過 學習 clu render 安裝:在Django官網下載最新版Django然後通過pip安裝即可 一、創建項目 進入文件夾,打開cmd窗口,輸入django-admin startproject myblog(

RabbitMQ學習工作隊列

lose borde 阻塞 lpad mes getc actor 使用 處理 1.工作隊列(Work Queue)又叫任務隊列(Task Queue)指將任務分發個多個消費者。 2.實際操作: 這裏使用一個生產者產生多條數據提供給3個消費者

RabbitMQ學習訂閱/發布

cto submit actor nal chan true exec oid lsp RabbitMQ學習(三)訂閱/發布 1.RabbitMQ模型 前面所學都只用到了生產者、隊列、消費者。如上圖所示,其實生產者並不直接將信息傳輸到隊列中,在生產者和隊列

機器學習

無限 size mage 計算機 博客 對數 png 表示 獲得 對學習的定義 學習經驗E 完成任務T 績效指標P 對於某類任務T和性能度量P,如果一個計算機程序在完成T上以P來衡量完成的好壞,並隨著經驗E而自我完善,那麽我們稱這個計算機程序在從經驗E中學習