RabbitMQ 配置和基本原理
1. 配置: (1)最簡單的java配置法:
public class Test { public static void main(String[] args) { ConnectionFactory connectionFactory = new CachingConnectionFactory(); ((CachingConnectionFactory)connectionFactory).setUsername("guest"); ((CachingConnectionFactory)connectionFactory).setPassword("guest"); ((CachingConnectionFactory)connectionFactory).setHost("127.0.0.1"); AmqpAdmin admin = new RabbitAdmin(connectionFactory); admin.declareQueue(new Queue("myqueue")); AmqpTemplate template = new RabbitTemplate(connectionFactory); template.convertAndSend("myqueue", "foo"); String foo = (String) template.receiveAndConvert("myqueue"); System.out.println(foo); } }
(2)使用springxml的配置方法:
private static void xmlConfig() { ApplicationContext context = new GenericXmlApplicationContext("classpath:/rabbit-context.xml"); AmqpTemplate template = context.getBean(AmqpTemplate.class); template.convertAndSend("myqueue", "foo");//routingKey, message String foo = (String) template.receiveAndConvert("myqueue"); System.out.println(foo); }
xml配置:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <rabbit:connection-factory id="connectionFactory"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="myqueue"/> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="127.0.0.1"/> <property name="username" value="guest"/> <property name="password" value="guest"/> </bean> </beans>
(3)使用Java Configuration配置方法:
private static void configurationJava() {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
System.out.println(foo);
}
configuration配置類:
package com.cisco.estore.configuration;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("127.0.0.1");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
2. 基礎概念: (1)目的: 元件解耦,Queue可以均攤分給不同消費者 (2)概念 A. ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的物件。 • Connection是RabbitMQ的socket連結,它封裝了socket協議相關部分邏輯。 • ConnectionFactory為Connection的製造工廠。 • Channel是我們與RabbitMQ打交道的最重要的一個介面,我們大部分的業務操作是在Channel這個介面中完成的,包括定義Queue、定義Exchange、繫結Queue與Exchange、釋出訊息等。 B. Exchange C. Exchange Type 有fanout、direct、topic、headers這四種類型 Fanout: 所有發到Exchange的訊息路由到所有與它繫結的Queue Direct: 把訊息路由到binding key和routing key完全匹配的Queue中 如果routingkey=error傳送到exchange,訊息會發到queue1和queue2。如果routingkey=info則只會傳送到Queue2 Topic: • routing key為一個句點號“. ”分隔的字串(我們將被句點號“. ”分隔開的每一段獨立的字串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” • binding key與routing key一樣也是句點號“. ”分隔的字串 • binding key中可以存在兩種特殊字元“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的訊息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的訊息會路由到Q1,routingKey=”lazy.brown.fox”的訊息會路由到Q2,routingKey=”lazy.pink.rabbit”的訊息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的訊息將會被丟棄,因為它們沒有匹配任何bindingKey。 回執機制: ○ 客戶端傳送請求(訊息)時,在訊息的屬性(MessageProperties,在AMQP協議中定義了14中properties,這些屬性會隨著訊息一起傳送)中設定兩個值replyTo(一個Queue名稱,用於告訴伺服器處理完成後將通知我的訊息傳送到這個Queue中)和correlationId(此次請求的標識號,伺服器處理完成後需要將此屬性返還,客戶端將根據這個id瞭解哪條請求被成功執行了或執行失敗) ○ 伺服器端收到訊息並處理 ○ 伺服器端處理完訊息後,將生成一條應答訊息到replyTo指定的Queue,同時帶上correlationId屬性 ○ 客戶端之前已訂閱replyTo指定的Queue,從中收到伺服器的應答訊息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理。