1. 程式人生 > >rabbitMq與spring boot搭配實現監聽

rabbitMq與spring boot搭配實現監聽

address app caching prefix 前段時間 ever 不用 理解 its

  在我前面有一篇博客說到了rabbitMq實現與zk類似的watch功能,但是那一篇博客沒有代碼實例,後面自己補了一個demo,便於理解。demo中主要利用spring boot的配置方式,

一、消費者(也就是watcher)配置

配置都采用spring的註解進行配置

1、創建連接

[email protected]
    public ConnectionFactory createConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
     //設置rabbitMq的ip和端口 connectionFactory.setAddresses(
"127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; }

2、創建交換機

    @Bean
    public Exchange fanoutExchange() {
        return new FanoutExchange("ex_rabbit_test");
    }

創建一個名為ex_rabbit_test的交換機,交換機的類型為廣播類型(為了實現消息的廣播)

3、創建隊列,並綁定到交換機上

    @Bean
    public Queue queueOne() {
        return new Queue("queue_one", false, false, true);
    }

    @Bean
    public Binding bindingOne(Queue queueOne, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueOne)
                .to(fanoutExchange);
    }

每一個消費者有自己的隊列,只消費自己隊列的消息;將隊列和交換機綁定之後,交換機會將生產者發出的消息放到所有綁定的隊列中,但是僅限廣播模式,其它模式會按照一定的路由規則進行消息路由,比如topic類型的交換機會按照routingKey路由消息。

註意:在廣播模式中,為了實現消息監聽,每個消費者需要各自起一個隊列,而且隊列名不相同,比如現在有另外一個消費者:

    @Bean
    public Queue queueTwo() {
        return new Queue("queue_two", false, false, true);
    }

    @Bean
    public Binding BingdingTwo(Queue queueTwo, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueTwo)
                .to(fanoutExchange);
    }

如此一來,當生產者將消息發到交換機ex_rabbit_test中時,交換機就將消息發給queue_one和queue_two兩個隊列中,兩個消費者分別取兩個隊列的消息進行消費。

4、消費消息

    @Bean
    public SimpleMessageListenerContainer execMessageContainerOne() {
     //設置監聽者“容器” SimpleMessageListenerContainer container
= new SimpleMessageListenerContainer(createConnectionFactory());
     //設置隊列名 container.setQueueNames(
"queue_one");
     //設置監聽者數量,即消費線程數 container.setConcurrentConsumers(
1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer one"; consumerService.doProcess(usr, msg);//消費消息 } catch(Exception e) { e.printStackTrace(); } } }); return container; } @Bean public SimpleMessageListenerContainer execMessageContainerTwo() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(createConnectionFactory()); container.setQueueNames("queue_two"); container.setConcurrentConsumers(1); container.setMessageListener((ChannelAwareMessageListener) (message, channel) ->{ byte[] body = message.getBody(); if(null != body) { try { String msg = new String(body); String usr = "Consumer two"; consumerService.doProcess(usr, msg);//消費消息 } catch (Exception e) { e.printStackTrace(); } } }); return container; }

consumerService提供消費消息的服務,執行如下方法

    public void doProcess(String usr, String msg) {
        System.out.println(usr + " receive message from producer:" + msg);
    }

二、生產者配置

1、與消費者相同的方式建立rabbitMq的連接

2、與消費者相同的方式設置交換機,交換機名稱也為ex_rabbit_test(如果rabbitmq中已經存在這個交換機,可以不用創建)

3、關於是否建立隊列以及將隊列與交換機綁定,我的理解是這樣的:

  如果在生產者的代碼裏面建立隊列並將其與交換機綁定,那麽就必須建立所有的消費者的隊列,並將所有隊列與交換機綁定,如果這樣做,消費者中就可以省掉這個配置。事實上,這樣做是有點得不償失的,我不贊同這樣做,這裏只是說明這樣做也可以達到目的。

4、創建rabbit模板(org.springframework.amqp.rabbit.core.RabbitTemplate)

    @Bean
    public RabbitTemplate rabbitTemplateProducer() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(this.createConnectionFactory());
        rabbitTemplate.setExchange("ex_rabbit_test");
        return rabbitTemplate;
    }

5、實現消息發送

  demo中使用spring web的方式啟動消息發送,下面是controller和service的代碼

@Controller
@RequestMapping(value="/index")
public class ProducerController {

    @Autowired
    private ProducerService producerService;

    @RequestMapping(value = "/send")
    @ResponseBody
    public String sendMsg(@RequestParam String msg) {
        producerService.send(msg);
        return "Success";
    }
}
@Service
public class ProducerService {

    @Resource(name = "rabbitTemplateProducer")
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        String message = "Hello, consumer.Sending:" + msg;
        rabbitTemplate.convertAndSend(message);
    }
}

三、pom文件

在consumer中只需要引入spring ampq的依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>
    </dependencies>

在prudocer中需要引入spring ampq的依賴,另外由於是啟動了web 項目,所以需要spring web的依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>
    </dependencies>

四、啟動項目和測試結果

使用spring boot可以快速啟動項目,首先,在8882端口上啟動producer,然後啟動consumer。通過在controller中定義的訪問地址http://localhost:8882/index/send?msg=hello everybody(此處的msg必須有,[email protected]),可以看到兩個消費者都消費了這條消息

Consumer one receive message from producer:Hello, consumer.Sending:hello everybody
Consumer two receive message from producer:Hello, consumer.Sending:hello everybody

從rabbitMq的後臺(http://localhost:15672 usrname:guest pasword:guest)可以看到剛才創建的交換機和隊列。

當消費者變多,或者為了代碼的統一管理,每個消費者的代碼需要相同,為了實現廣播需求,需要為每個消費者設置不同的隊列名。這種情況下,可以采用UUID的方式,每個消費者可以創建一個唯一的隨機隊列名。UUID方式創建隊列名的代碼可以在ampq的jar包中找到org.springframework.amqp.core.AnonymousQueue

     public String generateName() {
            UUID uuid = UUID.randomUUID();
            ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
            bb.putLong(uuid.getMostSignificantBits())
              .putLong(uuid.getLeastSignificantBits());
            // TODO: when Spring 4.2.4 is the minimum Spring Framework version, use encodeToUrlSafeString() SPR-13784.
            return this.prefix + Base64Utils.encodeToString(bb.array())
                                    .replaceAll("\\+", "-")
                                    .replaceAll("/", "_")
                                    // but this will remain
                                    .replaceAll("=", "");
        }

可以將UUID方法的返回值加在固定隊列名的後面,這樣就生成了一個唯一的隨機隊列名。關於UUID的描述可以自行百度。

ps:前段時間看了spring cloud,看到其中的一個工具,spring cloud bus也可以用作消息監聽,細察之後發現,spring cloud bus也是封裝了rabbitMq,實現了消息隊列。

rabbitMq與spring boot搭配實現監聽