SpringCloud 非同步和訊息
阿新 • • 發佈:2018-12-20
非同步
客戶端請求不會阻塞程序,服務端的響應可以是非即時的
RabbitMQ的簡單使用
1.新增依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.建立接收器
@Slf4j @Component public class MqReceiver { //1.無法自動建立,需要在Rabbit控制面板自己建立佇列@RabbitListener(queues = "myQueue") //2.自動建立佇列@RabbitListener(queuesToDeclare = @Queue("myQueue")) //3.自動建立,Exchange和Queue繫結 @RabbitListener(bindings = @QueueBinding( exchange = @Exchange("myExchange"), value = @Queue("myQueue") )) public void process(String message) { log.info("MqReceiver: {}", message); } @RabbitListener(bindings = @QueueBinding( exchange = @Exchange("myOrder"), key = "computer", value = @Queue("computerOrder") )) public void processComputer(String message) { log.info("computer MqReceiver: {}", message); } @RabbitListener(bindings = @QueueBinding( exchange = @Exchange("myOrder"), key = "fruit", value = @Queue("fruitOrder") )) public void processFruit(String message) { log.info("fruit MqReceiver: {}", message); } }
3.測試
@RunWith(SpringRunner.class) @SpringBootTest public class OrderApplicationTests { @Test public void contextLoads() { } } @Component public class MqSenderTest extends OrderApplicationTests { @Autowired private AmqpTemplate amqpTemplate; @Test public void send() { amqpTemplate.convertAndSend("myQueue", "now " + new Date()); } @Test public void sendOrder() { amqpTemplate.convertAndSend("myOrder", "computer", "now " + new Date()); } }
Spring Cloud Stream
1.引入依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.定義input和output介面
public interface StreamClient { String INPUT = "input"; String OUTPUT = "output"; String INPUT2 = "input2"; String OUTPUT2 = "output2"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.OUTPUT) MessageChannel output(); @Input(StreamClient.INPUT2) SubscribableChannel input2(); @Output(StreamClient.OUTPUT2) MessageChannel output2(); }
3.訊息接收類
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
@StreamListener(StreamClient.INPUT)
// 通知回撥
@SendTo(StreamClient.INPUT2)
public String process(OrderDTO message) {
log.info("StreamReceiver: {}", message);
// 傳送MQ訊息
return "received";
}
@StreamListener(StreamClient.INPUT2)
public void process2(String message) {
log.info("StreamReceiver2: {}", message);
}
}
4.配置
spring:
stream:
bindings:
input:
destination: myMessage
# 通過json格式傳遞資料
content-type: application/json
# 訊息分組,把一個服務劃到一個組裡,無論你起了多少個例項,只會有一個例項消費
group: order
output:
destination: myMessage
content-type: application/json
group: order
input2:
destination: myMessage2
content-type: application/json
group: order
output2:
destination: myMessage2
content-type: application/json
group: order
5.呼叫
@RestController
public class SendMessageController {
@Autowired
private StreamClient streamClient;
@GetMapping("/sendMessage")
public void process() {
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId("123456");
streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());
}
}
StreamReceiver: OrderDTO(orderId=123456, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=null, orderStatus=null, payStatus=null, orderDetailList=null) StreamReceiver2: received