【中介軟體】MQ Rabbitmq 和spring整合
阿新 • • 發佈:2019-02-20
例一、Xml配置檔案
定義連線、定義了佇列或交換機、可以設定Key、定義了消費者及路徑
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd" >
<!-- 定義RabbitMQ的連線工廠 -->
<rabbit:connection-factory id="connectionFactory"
host="127.0.0.1" port="5672" username="mjx" password="mjx"
virtual-host="/mjx" />
<!-- 定義Rabbit模板,指定連線工廠以及定義exchange 放送到交換機 -->
<!--模板:通過模板將訊息傳送到佇列::將exchange改成queue service注入-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
<!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="fanoutExchange" routing-key="foo.bar" /> -->
<!-- MQ的管理,包括佇列、交換器等 維護自動聲明後的操作 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 定義佇列,自動宣告 rabbit需要建立佇列 -->
<rabbit:queue name="myQueue" auto-declare="true"/>
<!-- 定義交換器,自動宣告:需要的時沒有建立建立,有忽略 durable="true"持久化,儲存在硬碟 -->
<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" durable="true">
<rabbit:bindings>
<!--將佇列繫結到交換機-->
<rabbit:binding queue="myQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- <rabbit:topic-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="foo.*" />
</rabbit:bindings>
</rabbit:topic-exchange> -->
<!-- 消費者foo:監聽myqueue佇列,監聽-->
<rabbit:listener-container connection-factory="connectionFactory">
<!--有訊息執行listen方法-->
<rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
<bean id="foo" class="cn.itcast.rabbitmq.spring.Foo" />
</beans>
消費者
public class Foo {
//具體執行業務的方法
public void listen(String foo) {
System.out.println("消費者: " + foo);
}
}
生產者:
public class SpringMain {
public static void main(final String... args) throws Exception {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring/rabbitmq-context.xml");
//RabbitMQ模板
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
//傳送訊息
template.convertAndSend("Hello, world!");
Thread.sleep(1000);// 休眠1秒
ctx.destroy(); //容器銷燬
}
}
2、@Autowired
Private RabbitTemplate rabbitTemplate;
Map<String ,Object> msg = new HashMap<String,Object>();
Msg.put("itemId",item.getId());
Msg.put("type","update");
Msg.put("date",system.currentTimeMillis());//當前時間戳
//傳送訊息
This.rabbitTemplate.convertAndSend("item.update",MAPPER.writeValueAsString(msg));//mapper序列化 抓異常,裝飾
例子二:
Spring-rabbit:資料同步:萬用字元模式
service加入:使用的最低層匯入;都用則公共;執行時在web層匯入
servoce.xml
<!-- 定義RabbitMQ的連線工廠 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.ip}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.vhost}"/>
<!--管理-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--交換機-->
<rabbit:topic-exchange name="TAOTAO_ITEM_EXCHANAGE" auto-declare="true" durable="true"/>
<!--定義模板-->
<rabbit:template id="taotaoItemTemplate" connection-factory="connectionFactory" exchange="TAOTAO_ITEM_EXCHANAGE"/>
consumer.xml
<!-- 定義RabbitMQ的連線工廠 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.ip}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.vhost}"/>
<!--定義管理-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義佇列 沒有交換機-->
<rabbit:topic-exchange name="TAOTAO_WEB_ITEM_EXCHANAGE" auto-declare="true" durable="false"/>
<!--定義消費者 沒有模板 -->
<bean id="itemMQHandler" class="itemMQHandler"/>
<!--定義消費者監聽佇列-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="itemMQHandler" method="excute" queue-names="TAOTAO_WEB_ITEM_EXCHANAGE"/>
</rabbit:listener-container>
佇列與互動機:
佇列繫結交換機
rabbitmq管理介面,低耦合、需要發郵件、申請才能繫結
MQ網頁:
User
tags:角色;virtual host:資料庫 斜槓開頭 設定許可權;node:叢集中的節點
埠:5672 java客戶端,協議埠 ; 15672 管理工具埠 ; 25672 叢集埠
Exchanges:交換機 D: 1、持久化(將資料儲存在硬碟) 2、非持久化:儲存在記憶體:讀寫快、資料丟失 durable=”true”