1. 程式人生 > >spring整合rabbitMq(基於topic和fanout模式)

spring整合rabbitMq(基於topic和fanout模式)

本文程式碼樣例都是在spring整合環境下寫的,都測試通過。
pom檔案需要加入spring整合rabbitMq的依賴:

   <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.6.3.RELEASE</version>
   </dependency>

一、 rabbitMq的topic模式:

圖解:
這裡寫圖片描述

使用場景:傳送端不只按固定的routing key傳送訊息,而是按字串“匹配”傳送,接收端同樣如此。

傳送端spring的xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- rabbitMQ配置 --> <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="192.168.1.11"/> <property name="username" value="root"/> <property name="password" value="lee13233"/> <property name="channelCacheSize" value="8"/> <property name="port" value="5672"></property> </bean> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!-- autoDelete:是否自動刪除 durable:持久化 --> <rabbit:queue name="test123queue" durable="true"/> <rabbit:queue name="test321queue" durable="true"/> <!-- topic主題 --> <rabbit:topic-exchange name="leo.pay.topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> <bindings> <binding queue="test123queue" pattern="*.*.test123" /> <binding queue="test321queue" pattern="test321.#" /> </bindings> </rabbit:topic-exchange> <!-- 建立rabbitTemplate 訊息模板類 --> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="rabbitConnectionFactory"></constructor-arg> </bean> </beans>

傳送端的java程式碼:

@Test
    public void testRabbitMq() throws Exception {
        RabbitTemplate rabbitTemplate = (RabbitTemplate) LeoContext.getContext().getApplication().getBean("rabbitTemplate");
        //第二個引數為路由key(routingKey)的值,當路由可以為test321.hello.test123時,兩個消費佇列都可以收到訊息,當值為test321.hello.aaa時,只有綁定了test321.#的佇列才可以收到訊息,當值為ta1.hello.test123,只有綁定了*.*.test123的佇列才可收到訊息
        for(int i = 1; i <= 10; i++) {
            String str = "hello" + i;
        rabbitTemplate.send("leo.pay.topic.exchange", "test321.hello.test123", new Message(str.getBytes(), new MessageProperties()));
        }
    }

接收端(我配置的接收端與傳送端不在同一個專案)spring的xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
    http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
    default-autowire="byName">

    <!-- rabbitMQ配置 -->
    <bean id="rabbitConnectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="192.168.1.11"/>
        <property name="username" value="leo"/>
        <property name="password" value="lee31211"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
    </bean>
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <rabbit:queue name="test123queue" durable="true" />
    <rabbit:queue name="test321queue" durable="true" />

    <!-- 該處是指將路由leo.pay.topic.exchange與兩個佇列繫結在一塊,也可以在rabbitMq的控制檯上手動繫結,手動繫結之後,該處程式碼可以省略,其實發送端已經繫結過了,也沒必要綁定了,所以該程式碼可以省略 -->
 <!--   <rabbit:topic-exchange name="leo.pay.topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> -->
<!--      <bindings> -->
<!--        <binding queue="test123queue" pattern="test123.*" /> -->
<!--        <binding queue="test321queue" pattern="test321.*" /> -->
<!--      </bindings> -->
<!--    </rabbit:topic-exchange> -->

    <!-- 啟動兩個佇列對應的監聽(消費者) -->
    <bean id="detailQueueConsumer" class="com.leo.website.cousumer.DetailQueueConsumer"></bean>
    <bean id="testQueueConsumer" class="com.leo.website.cousumer.TestQueueConsumer"></bean>

    <!-- 將兩個佇列加入監聽容器中,每個佇列的監聽都對應一個監聽器 -->
    <rabbit:listener-container connection-factory="rabbitConnectionFactory" concurrency= "8">
        <rabbit:listener queues="test123queue" ref="detailQueueConsumer" method="onMessage"/>
        <rabbit:listener queues="test321queue" ref="testQueueConsumer" method="onMessage"/>
    </rabbit:listener-container>

</beans>

接收端java程式碼(只列出一個監聽,另外一個類似):

public class DetailQueueConsumer implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
    }
}

二、 rabbitMq的fanout模式(釋出者訂閱者模式):

圖解:
這裡寫圖片描述

使用場景:釋出、訂閱模式,傳送端傳送廣播訊息,多個接收端接收。

傳送端spring的xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
    http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- rabbitMQ配置 -->
    <bean id="rabbitConnectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="192.168.1.12"/>
        <property name="username" value="root"/>
        <property name="password" value="lee323"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
    </bean>
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <rabbit:queue name="test123queue" durable="true"/>
    <rabbit:queue name="test321queue" durable="true"/>

    <rabbit:fanout-exchange name="leo.pay.fanout.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> 
    </rabbit:fanout-exchange>

    <!-- 建立rabbitTemplate 訊息模板類 -->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
    </bean>

</beans>

傳送端java程式碼:

@Test
    public void testRabbitMq() throws Exception {
        RabbitTemplate rabbitTemplate = (RabbitTemplate) LeoContext.getContext().getApplication().getBean("rabbitTemplate");
        //往名字為leo.pay.fanout.exchange的路由裡面傳送資料,客戶端中只要是與該路由繫結在一起的佇列都會收到相關訊息,這類似全頻廣播,傳送端不管佇列是誰,都由客戶端自己去繫結,誰需要資料誰去繫結自己的處理佇列。
        for(int i = 1; i <= 10; i++) {
            String str = "hello" + i;
rabbitTemplate.send("leo.pay.fanout.exchange", "", new Message(str.getBytes(), new MessageProperties()))
        }
    }

客戶端spring的xml配置(我舉例的配置客戶端與傳送端不在同一專案下):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
    http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
    default-autowire="byName">

    <!-- rabbitMQ配置 -->
    <bean id="rabbitConnectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="192.168.1.13"/>
        <property name="username" value="root"/>
        <property name="password" value="lee2342"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
    </bean>
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <rabbit:queue name="test123queue" durable="true" />
    <rabbit:queue name="test321queue" durable="true" />

    <!-- 該處把需要資料的佇列與路由繫結一起,如果手動在控制檯繫結就不需要此程式碼 -->
    <rabbit:fanout-exchange name="leo.pay.fanout.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="test123queue"></rabbit:binding>
            <rabbit:binding queue="test321queue"></rabbit:binding> 
        </rabbit:bindings> 
    </rabbit:fanout-exchange> 


    <bean id="detailQueueConsumer" class="com.leo.website.cousumer.DetailQueueConsumer"></bean>
    <bean id="testQueueConsumer" class="com.leo.website.cousumer.TestQueueConsumer"></bean>

    <rabbit:listener-container connection-factory="rabbitConnectionFactory" concurrency= "8">
        <rabbit:listener queues="test123queue" ref="detailQueueConsumer" method="onMessage"/>
        <rabbit:listener queues="test321queue" ref="testQueueConsumer" method="onMessage"/>
    </rabbit:listener-container>

</beans>

相關理論說明文章:
1、http://www.2cto.com/kf/201612/575219.html
2、http://www.cnblogs.com/luxiaoxun/p/3918054.html
3、http://hwcrazy.com/34195c9068c811e38a44000d601c5586/be62fc2668c811e3adba000d601c5586/
4、http://blog.csdn.net/lmj623565791/article/details/37706355