1. 程式人生 > >【本人禿頂程式設計師】Spring Cloud Stream如何消費自己生產的訊息

【本人禿頂程式設計師】Spring Cloud Stream如何消費自己生產的訊息

←←←←←←←←←←←← 我都禿頂了,還不點關注!

本文將繼續說說在另外一個被經常問到的問題:如果微服務生產的訊息自己也想要消費一份,應該如何實現呢?

常見錯誤

在放出標準答案前,先放出一個常見的錯誤姿勢和告警資訊(以便您可以通過搜尋引擎找到這裡_)。以下錯誤基於Spring Boot 2.0.5、Spring Cloud Finchley SR1。

首先,根據入門示例,為了生產和消費訊息,需要定義兩個通道:一個輸入、一個輸出。比如下面這樣:

public interface TestTopic {

    String OUTPUT = "example-topic";
    String INPUT = "example-topic";

    @Output(OUTPUT)
    MessageChannel output();

    @Input(INPUT)
    SubscribableChannel input();

}

通過INPUT和OUTPUT使用相同的名稱,讓生產訊息和消費訊息指向相同的Topic,從而實現消費自己發出的訊息。

接下來,建立一個HTTP介面,並通過上面定義的輸出通道觸來生產訊息,比如:

@Slf4j
@RestController
public class TestController {

    @Autowired
    private TestTopic testTopic;

    @GetMapping("/sendMessage")
    public String messageWithMQ(@RequestParam String message) {
        testTopic.output().send(MessageBuilder.withPayload(message).build());
        return "ok";
    }

}

已經有生產訊息的實現,下面來建立對輸入通道的監聽,以實現訊息的消費邏輯。

@Slf4j
@Component
public class TestListener {

    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload);
        throw new RuntimeException("BOOM!");
    }

}

最後,在應用主類中,使用@EnableBinding註解來開啟它,比如:

@EnableBinding(TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

}

看似天衣無縫的操作,然而在啟動的瞬間,你可能收到了下面這樣的錯誤:

org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.didispace.stream.TestTopic; factoryMethodName=input; initMethodName=null; destroyMethodName=null
	at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:64) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:54) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:86) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:76) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:358) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_151]
	at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:357) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:328) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:233) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:271) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:91) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:694) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]
	at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:780) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1277) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1265) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at com.didispace.stream.TestApplication.main(TestApplication.java:13) [classes/:na]

正確姿勢

根據錯誤提示:

Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists,

沒有啟動成功的原因是已經存在了一個名為example-topic的Bean,那麼為什麼會重複建立這個Bean呢?

實際上,在F版的Spring Cloud Stream中,當我們使用@Output和@Input註解來定義訊息通道時,都會根據傳入的通道名稱來建立一個Bean。而在上面的例子中,我們定義的@Output和@Input名稱是相同的,因為我們系統輸入和輸出是同一個Topic,這樣才能實現對自己生產訊息的消費。

既然這樣,我們定義相同的通道名是行不通了,那麼我們只能通過定義不同的通道名,併為這兩個通道配置相同的目標Topic來將這一對輸入輸出指向同一個實際的Topic。對於上面的錯誤程式,只需要做如下兩處改動:

第一步:修改通道名,使用不同的名字

public interface TestTopic {

    String OUTPUT = "example-topic-output";
    String INPUT = "example-topic-input";

    @Output(OUTPUT)
    MessageChannel output();

    @Input(INPUT)
    SubscribableChannel input();

}

第二步:在配置檔案中,為這兩個通道設定相同的Topic名稱,比如:

spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic
spring.cloud.stream.bindings.example-topic-output.destination=aaa-topic

這樣,這兩個輸入輸出通道就會都指向名為aaa-topic的Topic了。

最後,再啟動該程式,沒有報錯。然後訪問介面:localhost:8080/sendMessage?message=hello-didi,可以在控制檯中看到如下資訊:

2018-11-17 23:24:10.425  INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-11-17 23:24:10.453  INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#266753da:0/[email protected] [delegate=amqp://[email protected]:5672/, localPort= 60752]
2018-11-17 23:24:10.458  INFO 32039 --- [ctor-http-nio-2] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (aaa-topic.anonymous.fNUxZ8C0QIafxrhkFBFI1A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2018-11-17 23:24:10.483  INFO 32039 --- [IafxrhkFBFI1A-1] com.didispace.stream.TestListener        : Received: hello-didi

消費自己生產的訊息成功了!讀者也還可以訪問一下應用的/actuator/beans端點,看看當前Spring上下文中有哪些Bean,應該可以看到有下面Bean,也就是上面分析的兩個通道的Bean物件

"example-topic-output": {
    "aliases": [],
    "scope": "singleton",
    "type": "org.springframework.integration.channel.DirectChannel",
    "resource": null,
    "dependencies": []
},
"example-topic-input": {
    "aliases": [],
    "scope": "singleton",
    "type": "org.springframework.integration.channel.DirectChannel",
    "resource": null,
    "dependencies": []
},

後記

其實大部分開發者在使用Spring Cloud Stream時候碰到的問題都源於對Spring Cloud Stream的核心概念還是不夠理解。

寫在最後:

禿頂程式設計師的不易,看到這裡,點了關注吧!
點關注,不迷路,持續更新!!!