1. 程式人生 > >Stream進階篇-動態繫結訊息通道

Stream進階篇-動態繫結訊息通道

前言 在之前的章節中,所有消費者和生產者均通過@EnableBinding定義,此方式能夠快速的構建生產消費關係,但仔細想想,如果我們需要根據一定的條件決策訊息生產者將訊息發往哪個通道,貌似當前簡單粗暴的方式無法滿足。如此常見的場景,springcloud必然會幫我們想到,通過BinderAwareChannelResolver的bean例項即可實現動態通道的選擇,其會伴隨@EnableBinding註解自動完成註冊。 本章概要 1、BinderAwareChannelResolver的應用; 2、ExpressionEvaluatingRouter的應用; BinderAwareChannelResolver的應用
首先來看BinderAwareChannelResolver的直接應用,為了方便場景模擬,採用一個rest api方式觸發訊息的生產傳送。 消費者Receiver工程改造 1、在MySink中新增如下兩個動態接收通道,dynamic1-channeldynamic1-channel
package com.cloud.shf.stream.sink;
public interface MySink {
    /*********************************動態通道選擇示例******************************/
    String DYNAMIC1_CHANNEL = "dynamic1-channel";
    String DYNAMIC2_CHANNEL = "dynamic2-channel";

    @Input(DYNAMIC1_CHANNEL)
    SubscribableChannel dynamic1Input();

    @Input(DYNAMIC2_CHANNEL)
    SubscribableChannel dynamic2Input();
}
2、在SinkReceiver.class中新增對上述兩個通道的監聽,並列印接收內容:
/*********************************動態通道選擇示例******************************/
@StreamListener(value = MySink.DYNAMIC1_CHANNEL)
public void dynamic1Receiver(@Payload User user) {
    LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC1_CHANNEL, user.getAge());
}

@StreamListener(value = MySink.DYNAMIC2_CHANNEL)
public void dynamic2Receiver(@Payload User user) {
    LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC2_CHANNEL, user.getAge());
}
生產者Sender工程改造 新增一個DynamicDestinationController類,提供一個rest-api協助進行場景模擬:
package com.cloud.shf.stream.controller;
@EnableBinding
@Controller
public class DynamicDestinationController {

    @Autowired
    private BinderAwareChannelResolver resolver;

    /************************************方式一************************************/
    @RequestMapping(path = "/{dest}", method = RequestMethod.POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@PathVariable("dest") String dest,
                              @RequestBody String body,
                              @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, dest, contentType);
    }

    private void sendMessage(String body, String dest, Object contentType) {
        resolver.resolveDestination(dest).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}
Note:
  • 直接注入BinderAwareChannelResolver的bean例項即可;
  • 通過PathVariable屬性dest值模擬通道名稱;
  • boby作為訊息體;
  • contentType作為訊息的頭資訊;
服務驗證 1、啟動receiver、sender兩個工程; 2、多次通過curl請求api如下(curl -H "Content-Type: application/json" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/dynamic*-channel)
此時可以看到receiver工程的控制檯列印如下
其列印的來源通道與請求中的佔位符完全匹配,繼續觀察sender控制檯日誌,由於原來並沒有在sernder中定義相關通道描述,故首次觸發指定通道即可看到如下日誌記錄: 小節,由此可以看到,根據佔位符dest動態路由成功,準確的被髮送至預期的訊息通道。實際應用中,如果我們預先知道可能的動態路由通道名稱,則可以通過spring.cloud.stream.dynamicDestinations配置白名單,只有預設定的通道名稱方會被動態繫結,避免建立大量無效的通道資訊,浪費資源。 ExpressionEvaluatingRouter的應用 通過下圖可以BinderAwareChannelResolver類的定義, 其實現了Spring Integration的DestinationResolver介面,並且BinderAwareChannelResolver例項可以被注入在其他的components例項中,本小節將實現將BinderAwareChannelResolver例項注入在ExpressionEvaluatingRouter中實現訊息通道的動態繫結。 1、在DynamicDestinationController類中新增如下實現:
/************************************方式二************************************/
@RequestMapping(path = "/", method = RequestMethod.POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody User body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType, @RequestHeader(name = "dest", required = false) String dest) {
    Map<String, Object> headers = new HashMap<>(2);
    headers.put(MessageHeaders.CONTENT_TYPE, contentType);
    if (!StringUtils.isEmpty(dest)) {
        headers.put("dest", dest);
    }
    sendMessage(body, headers);
}

private void sendMessage(User body, Map<String, Object> headers) {
    routerChannel().send(MessageBuilder.createMessage(body,
            new MessageHeaders(headers)));
}

@Bean(name = "router-channel")
public MessageChannel routerChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "router-channel")
public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("headers[dest]"));
    //作用於通過spel表示式沒有獲取到對應的通道資訊
    router.setDefaultOutputChannelName("dynamic1-channel");
    router.setChannelResolver(resolver);
    return router;
}
Note:
  • 通過頭資訊中的dest屬性作為動態繫結的依據;如果未設定dest則採用預設dynamic1-channel作為訊息通道;
  • 通過Spel表示式獲取頭資訊中的dest屬性值(headers[dest]);
  • BinderAwareChannelResolver注入至ExpressionEvaluatingRouter例項中;
  • 其中org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor實現了對Message的處理,故可以通過此處看到我們訊息包含的訊息體、訊息體具體資訊,從而更好的編寫Spel表示式,主要程式碼如下:
2、再次通過curl多次請求如下(curl -H "Content-Type: application/json" -H "dest:dynamic1-channel" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/):
receiver工程控制檯如下: 此時可以看到,接收的訊息來源通道與請求頭呼應,特別關注第5個case,其並未設定dest屬性,故採用了預設的dynamic1-channel通道。