基於dubbo的分散式專案框架搭建 開發工具idea (springboot+dubbo+zookeeper+redis+rabbitmq+基於Swagger2的restful api) --(四)
阿新 • • 發佈:2018-10-31
1.rabbitmq的整合
首先在配置檔案裡增加
#rabbitMQ
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
1.1這裡的username和password是需要接下來我們自己在rabbitmq裡新增的使用者和密碼,也可以使用預設的使用者和密碼guest
1.2 安裝rabbitmq之前需要先安裝OTP 點選進入以下網址,自己選擇合適版本下載
http://www.erlang.org/downloads
安裝完後繼續安裝rabbitmq
http://www.rabbitmq.com/download.html
rabbitmq 和 otp 全都預設下一步安裝即可
詳細的步驟請看
https://www.cnblogs.com/ericli-ericli/p/5902270.html
安裝配置完成後,進入管控臺介面,可以在queues裡看到宣告的佇列的資訊
配置佇列等資訊
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author hhp * @Title: RabbitConfiguration * @Description: mq 佇列,配置 * @return: * @throws */ @Configuration public class RabbitConfiguration { // 宣告佇列 @Bean public Queue queue1() { return new Queue("hello.queue1", true); } @Bean public Queue queue2() { return new Queue("hello.queue2", true); } // 宣告互動器 @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } // 繫結 @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1"); } @Bean public Binding binding2() { return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#"); } }
配置訊息的傳送者
import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.UUID; /** * @author hhp * @Title: Sender * @Description: 向mq傳送訊息 * @param: * @return: * @throws */ @Component public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback { private static Logger logger = Logger.getLogger(App.class); @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("訊息傳送成功:" + correlationData); } else { logger.info("訊息傳送失敗:" + cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.error(message.getMessageProperties().getCorrelationIdString() + " 傳送失敗"); } // 傳送訊息, public void send(String msg) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); logger.info("開始傳送訊息 : " + msg.toLowerCase()); String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString(); logger.info("結束髮送訊息 : " + msg.toLowerCase()); logger.info("消費者響應 : " + response + " 訊息處理完成"); } }
配置訊息的接收者
/** * @author hhp * @Title: Receiver * @Description: 接收訊息 * @param: * @return: * @throws */ @Component public class Receiver { private static Logger logger = Logger.getLogger(App.class); @RabbitListener(queues = "hello.queue1") public String processMessage1(String msg) { logger.info(Thread.currentThread().getName() + " 接收到來自hello.queue1佇列的訊息:" + msg); return msg.toUpperCase(); } @RabbitListener(queues = "hello.queue2") public void processMessage2(String msg) { logger.info(Thread.currentThread().getName() + " 接收到來自hello.queue2佇列的訊息:" + msg); } }
執行test類
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.ysk.component.Sender; @RunWith(value = SpringJUnit4ClassRunner.class) @SpringBootTest(classes = App.class) public class RabbitTest { @Autowired private Sender sender; @Test public void sendTest() throws Exception { for (int i = 0; i < 3; i++) { sender.send("訊息傳送測試"); } } }