1. 程式人生 > >基於dubbo的分散式專案框架搭建 開發工具idea (springboot+dubbo+zookeeper+redis+rabbitmq+基於Swagger2的restful api) --(四)

基於dubbo的分散式專案框架搭建 開發工具idea (springboot+dubbo+zookeeper+redis+rabbitmq+基於Swagger2的restful api) --(四)

1.rabbitmq的整合

首先在配置檔案裡增加

#rabbitMQ
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

1.1這裡的username和password是需要接下來我們自己在rabbitmq裡新增的使用者和密碼,也可以使用預設的使用者和密碼guest

1.2 安裝rabbitmq之前需要先安裝OTP 點選進入以下網址,自己選擇合適版本下載

http://www.erlang.org/downloads

安裝完後繼續安裝rabbitmq

http://www.rabbitmq.com/download.html

 rabbitmq 和 otp 全都預設下一步安裝即可

詳細的步驟請看

https://www.cnblogs.com/ericli-ericli/p/5902270.html

安裝配置完成後,進入管控臺介面,可以在queues裡看到宣告的佇列的資訊

配置佇列等資訊

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**   
 * @author hhp
 * @Title: RabbitConfiguration  
 * @Description: mq 佇列,配置 
 * @return:       
 * @throws   
 */
@Configuration
public class RabbitConfiguration {

   // 宣告佇列
   @Bean
   public Queue queue1() {
      return new Queue("hello.queue1", true);
   }

   @Bean
   public Queue queue2() {
      return new Queue("hello.queue2", true);
   }

   // 宣告互動器
   @Bean
    TopicExchange topicExchange() {
      return new TopicExchange("topicExchange");
   }

   // 繫結
   @Bean
   public Binding binding1() {
      return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
   }

   @Bean
   public Binding binding2() {
      return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
   }

}

配置訊息的傳送者

import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;

/**
 * @author hhp
 * @Title: Sender
 * @Description: 向mq傳送訊息
 * @param:
 * @return:
 * @throws
 */
@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {

   private static Logger logger = Logger.getLogger(App.class);

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @PostConstruct
   public void init() {
      rabbitTemplate.setConfirmCallback(this);
      rabbitTemplate.setReturnCallback(this);
   }

   @Override
   public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      if (ack) {
         logger.info("訊息傳送成功:" + correlationData);
      } else {
         logger.info("訊息傳送失敗:" + cause);
      }

   }

   @Override
   public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
      logger.error(message.getMessageProperties().getCorrelationIdString() + " 傳送失敗");

   }

   // 傳送訊息,
   public void send(String msg) {

      CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

      logger.info("開始傳送訊息 : " + msg.toLowerCase());
      String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
      logger.info("結束髮送訊息 : " + msg.toLowerCase());
      logger.info("消費者響應 : " + response + " 訊息處理完成");
   }
}

配置訊息的接收者

/**   
 * @author hhp
 * @Title: Receiver 
 * @Description: 接收訊息
 * @param:       
 * @return:       
 * @throws   
 */
@Component
public class Receiver {
   
   private static Logger logger = Logger.getLogger(App.class);

   @RabbitListener(queues = "hello.queue1")
   public String processMessage1(String msg) {
      logger.info(Thread.currentThread().getName() + " 接收到來自hello.queue1佇列的訊息:" + msg);
      return msg.toUpperCase();
   }

   @RabbitListener(queues = "hello.queue2")
   public void processMessage2(String msg) {
      logger.info(Thread.currentThread().getName() + " 接收到來自hello.queue2佇列的訊息:" + msg);
   }
}

執行test類

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.ysk.component.Sender;

@RunWith(value = SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = App.class)
public class RabbitTest {

   @Autowired
   private Sender sender;

   @Test
   public void sendTest() throws Exception {
      for (int i = 0; i < 3; i++) {
         sender.send("訊息傳送測試");
      }
   }
}