RabbitMq 實戰(一)
阿新 • • 發佈:2017-07-10
rabbitmq spring boot (消費者處理消息)
RabbitMq消息消費者服務
開發工具Idea和Spring boot來開發的。
消息消費目前只是一個簡單的Demo,後續會處理成更智能一些。
首先配置文件類,RabbitMqConfig,裏面配置一些用戶名和密碼嗨喲隊列信息。
package com.basic.rabbitmq.consumer.config; import com.basic.rabbitmq.consumer.listener.HandleMessageListenerAdapter; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.core.env.Environment; import com.rabbitmq.client.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; /** * Rabbitmq配置類 * Created by sdc on 2017/7/4. */ @Configuration @ComponentScan(basePackages = {"com.basic"}) @PropertySource(value = {"classpath:application.properties"}) public class RabbitMqConfig { @Autowired private Environment env; /** * 構建connectionfactory * @return * @throws Exception */ @Bean public ConnectionFactory connectionFactory() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(env.getProperty("spring.rabbitmq.host")); connectionFactory.setPort(Integer.valueOf("5672".trim())); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username")); connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password")); return connectionFactory; } /** * CachingConnectionFactory * @return * @throws Exception */ @Bean public CachingConnectionFactory cachingConnectionFactory() throws Exception { return new CachingConnectionFactory(connectionFactory()); } /** * RabbitTemplate,類似於jdbctemplate一樣的工具類 * @return * @throws Exception */ @Bean public RabbitTemplate rabbitTemplate() throws Exception { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory()); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } @Bean public AmqpAdmin amqpAdmin() throws Exception { return new RabbitAdmin(cachingConnectionFactory()); } @Bean public SimpleMessageListenerContainer listenerContainer( @Qualifier("handleMessageListenerAdapter") HandleMessageListenerAdapter handleMessageListenerAdapter) throws Exception { //隊列名字 String queueName = env.getProperty("emial.server.queue").trim(); //單一的消息監聽容器 SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(cachingConnectionFactory()); simpleMessageListenerContainer.setQueueNames(queueName); simpleMessageListenerContainer.setMessageListener(handleMessageListenerAdapter); //手動設置 ACK,就是成功消費信息了,就設置一下這個,rabbitmq就從此隊列裏刪除這條信息了。 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); return simpleMessageListenerContainer; } }
我這裏配置了一個SimpleMessageListenerContainer,這個Bean,用來監聽隊列裏的消息的。
具體的
package com.basic.rabbitmq.consumer.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.ComponentScan; import org.springframework.mail.MailMessage; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 監聽消息的處理適配器 * Created by sdc on 2017/7/10. */ @Component("handleMessageListenerAdapter") public class HandleMessageListenerAdapter extends MessageListenerAdapter { // @Resource // private JavaMailSender mailSender; /** * 這塊和activemq那個監聽器差不多,都是監聽信息,也都是onMessage方法。 * @param message * @param channel * @throws Exception */ @Override public void onMessage(Message message, Channel channel) throws Exception { String messageDetail = new String(message.getBody()); //消息體 System.out.println("消息消費:" + messageDetail); // 手動ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
還有一些配制文件,請看
http://10103778.blog.51cto.com/10093778/1945756
這個博客,就可以看到具體的配制了。
啟動這個項目,就可以從隊列消費消息了。消費者還是比較簡單的,對應到相應的隊列就可以處理了消息了。
本文出自 “10093778” 博客,請務必保留此出處http://10103778.blog.51cto.com/10093778/1945974
RabbitMq 實戰(一)