Spring Cloud Stream是Spring Cloud的元件之一,是一個為微服務應用構建訊息驅動能力的框架。

1、匯入引用

        <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  

2、定義介面

public interface StreamClient {

    String INPUT = "input";
String OUTPUT = "output"; @Input(INPUT)
SubscribableChannel input(); @Output(OUTPUT)
MessageChannel output();
}

  

  

3、定義訊息的接收

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component; @Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver { @StreamListener(StreamClient.INPUT)
@SendTo(StreamClient.OUTPUT)
public Object processInput(String message){
log.info("Input StreamReceiver:{}", message );
return message;
} @StreamListener(StreamClient.OUTPUT)
public void processOutPut(String message){
log.info("Output StreamReceiver:{}", message );
} }

  

4、定義訊息的傳送

@RestController
public class SendMessageController { @Autowired
private StreamClient streamClient; @GetMapping("/sendMessage")
public void process(){
String msg = "hello world";
streamClient.output().send(MessageBuilder.withPayload(msg).build());
} }

  

5. 結果

 6、傳送物件

1) 傳送者

   /**
* 傳送物件
*/
@GetMapping("/sendMessage2")
public void process2(){
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId("123");
streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());
}

  

2)接收者

  @StreamListener(StreamClient.OUTPUT)
public void processOutPut(OrderDTO message){
log.info("Output StreamReceiver:{}", message.toString() );
}

  

7、訊息接收到後,再回復訊息。使用SendTo

    @StreamListener(StreamClient.INPUT)
@SendTo(StreamClient.OUTPUT) //當Input接收到訊息後,回覆訊息給Output
public Object processInput(String message){
log.info("Input StreamReceiver:{}", message );
return message;
}