1. 程式人生 > >cloud stream 官方文件閱讀筆記4

cloud stream 官方文件閱讀筆記4

5、程式設計模型
要了解程式設計模型,您應該熟悉以下核心概念:
Destination Binders 負責提供與外部訊息系統整合的元件
Destination Bindings 外部訊息傳遞系統和應用程式之間的橋樑,提供了訊息的生產者和消費者
Message:生產者和消費者用於與目標繫結器(以及通過外部訊息傳遞系統的其他應用程式)通訊的規範資料結構。

5.1、Destination Binders
目標繫結器是Spring Cloud Stream的擴充套件元件,負責提供必要的配置和實現,以促進與外部郵件系統的整合,
此整合負責與生產者和使用者之間的訊息的連線,委派和路由,資料型別轉換,使用者程式碼呼叫等
它處理許多鍋爐板的責任,否則它們會落在你的肩膀上,然而,為了實現這一點,繫結器仍然需
要來自使用者的簡約但需要的指令集的形式的一些幫助,其通常以某種型別的配置的形式出現。

5.2、Destination Bindings
如前所述,Destination Bindings在外部訊息傳遞系統和應用程式提供的生產者和消費者之間提供了一個橋樑。
將@EnableBinding 註解應用於其中一個應用程式的配置類可定義目標繫結。
@EnableBinding註釋本身使用@Configuration進行元註釋,並觸發Spring Cloud Stream基礎結構的配置。
以下示例顯示了一個完全配置且正常執行的Spring Cloud Stream應用程式,該應用程式將作為String型別
從INPUT目標接收訊息的有效負載(請參閱內容型別協商部分),將其記錄到控制檯並將其轉換為大寫後將其
傳送到OUTPUT目標。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication{
  public static void main(String[] args){
    SpringApplication.run(MyApplication.class, args);
  }
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String handle(String value){
    System.out.
println("Received: " + value); return value.toUpperCase(); } }

如您所見,@ EnableBinding批註可以將一個或多個介面類作為引數.這些引數稱為繫結,它們包含表示可繫結元件的方法.
這些元件通常是基於通道的繫結器(例如Rabbit,Kafka等)的訊息通道(請參閱Spring Messaging)。
然而,其他型別的繫結可以為相應技術的本機特徵提供支援。例如,Kafka Streams binder(以前稱為KStream)允許直
接繫結到Kafka Streams的本機繫結(有關詳細資訊,請參閱Kafka Streams)。
Spring Cloud Stream已經為典型的訊息交換合同提供了繫結介面,其中包括:
Sink: 通過提供訊息使用的目標來標識訊息使用者的介面
Source: 通過提供生成的訊息傳送到的目標來標識訊息生產者的介面
Processor: 通過暴露允許消費和生成訊息的兩個目標來封裝接收器和源合同
上面三個介面的實現程式碼:

public interface Sink{

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

public interface Source{

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}

public interface Processor extends Source, Sink{}

雖然前面的示例滿足大多數情況,但您也可以通過定義自己的繫結介面來定義自己的合同,
並使用@Input和@Output註釋來標識實際的可繫結元件。
例如:

public interface Barista {

  @Input
  SubscribableChannel orders();

  @Output
  MessageChannel hotDrinks();

  @Output
  MessageChannel coldDrinks();
}

使用前面示例中顯示的介面作為@EnableBinding的引數,分別觸發建立名為orders,hotDrinks和
coldDrinks的三個繫結通道。
您可以根據需要提供儘可能多的繫結介面,作為@EnableBinding批註的引數,如以下示例所示:

@EnableBinding(value = {Order.class, Paymet.class})

在Spring Cloud Stream中,可繫結的MessageChannel元件是Spring Messaging MessageChannel(用於出站)
及其擴充套件,SubscribableChannel(用於入站)

可輪詢目的地繫結

雖然之前描述的繫結支援基於事件的訊息消耗,但有時您需要更多控制,例如消耗率,從2.0版開始,
您現在可以繫結可輪詢的使用者,以下示例顯示如何繫結可輪詢使用者:

public interface PolledBar{

  @Input
  PollableMessageSource orders();

  . . .
}

在這種情況下,PollableMessageSource的實現繫結到訂單“channel”。

自定義頻道名稱

通過使用@Input和@Output註釋,您可以為通道指定自定義通道名稱,如以下示例所示

public interface Barista{
  @Input("inboundOrders")
  SubscribableChannel orders();
}

在前面的示例中,建立的繫結通道名為inboundOrders
通常,您不需要直接訪問單個通道或繫結(除了通過@EnableBinding註釋配置它們)
但是,有時可能會出現測試或其他特殊情況

除了為每個繫結生成通道並將它們註冊為Spring bean之外,對於每個繫結的介面,Spring Cloud
Stream生成實現介面的bean,這意味著您可以通過在應用程式中自動連線來訪問表示繫結或單個通
道的介面,如以下兩個示例所示:

A u t o w i r e B i n d i n g i n t e r f a c e Autowire Binding interface

@Autowire
private Source Source;

public void sayHello(String name){
  Source.output().send(MessageBuilder.withPayload(name).build());
}

A u t o w i r e i n d i v i d u a l c h a n n e l Autowire individual channel

@Autowire
private MessageChannel output;

public void sayHello(String name){
  output.send(MessageBuilder.withPayload(name).build());
}

對於自定義通道名稱的情況或需要特定命名通道的多通道方案,您還可以使用標準Spring的@Qualifier註釋。
以下示例顯示如何以這種方式使用@Qualifier註釋:

@Autowire
@Qualifier("myChannel")
private MessageChannel output;

5.3、生產和消費訊息
您可以使用Spring Integration註釋或Spring Cloud Stream本機註釋編寫Spring Cloud Stream應用程式。

5.3.1、Spring整合支援
Spring Cloud Stream建立在Enterprise Integration Patterns定義的概念和模式之上,並依賴於已經建立的內部實現
和Spring專案組合中的企業整合模式的流行實現:Spring Integration框架。
所以它唯一的理由就是支援Spring Integration已經建立的基礎,語義和配置選項

例如,您可以將Source的輸出通道附加到MessageSource並使用熟悉的@InboundChannelAdapter註釋,如下所示:

@EnableBinding(Source.class)
public class TimeSource {
  @Bean 
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", 
  maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource(){
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
  }
}

同樣,您可以使用@Transformer或@ServiceActivator,同時為處理器繫結契約提供訊息處理程式方法的實現,
如以下示例所示:

@EnableBinding(Processor.class)
public class TransformerProcessor{
  @Transformer(inputChannel = Processor.INPUT, outputchannel = Processor.OUTPUT)
  public Object Transform(String message){
    return message.toUpperCase();
  }
}

5.3.2、使用 @StreamListener註解

作為Spring Integration支援的補充,Spring Cloud Stream提供了自己的@StreamListener註解,
以其他Spring Messaging註釋(@ MessessMapping,@ JamsListener,@ RackListener等)為模型,
並提供諸如基於內容的路由等方便的便利。

@EnableBinding(Sink.class)
public class VoteHandler{
  @Autowired
  VotingService votingservice;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote){
    VotingService.record(vote);
  }
}

與其他Spring Messaging方法一樣,方法引數可以使用@Payload,@ Headers和@Header進行註釋。
對於返回資料的方法,必須使用@SendTo批註指定方法返回的資料的輸出繫結目標,如以下示例所示:

@EnableBinding(Processor.class)
public class TransformProcessor{
  @Autowired
  VotingService votingservice;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote){
    return votingservice.record(vote);
  }
}

5.3.3、使用@StreamListener進行基於內容的路由
Spring Cloud Stream支援根據條件將訊息分派給使用@StreamListener註釋的多個處理程式方法。
為了有資格支援條件分派,方法必須滿足以下條件:
它不能有返回值。
它必須是單獨的訊息處理方法(不支援反應API方法)。

條件由註釋的condition引數中的SpEL表示式指定,並針對每條訊息進行評估。條件匹配的所有處理
程式都在同一個執行緒中呼叫,並且不必假設呼叫發生的順序。
在以下帶有排程條件的@StreamListener示例中,帶有值為bogey的頭型別的所有訊息都將被排程到receiveBogey方法
帶有值為bacall的頭型別的所有訊息都將被分派到receiveBacall方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}
條件語境中的內容型別談判

使用@StreamListener的condition引數瞭解基於內容的路由背後的一些機制非常重要,特別是在整個訊息型別的背景下。
如果您在繼續操作之前熟悉內容型別協商,這也可能有所幫助。
請考慮以下情形:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")
    public void bark(Dog dog) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")
    public void purr(Cat cat) {
       // handle the message
    }
}

上述程式碼完全有效。它編譯和部署沒有任何問題,但它永遠不會產生您期望的結果。

那是因為你正在測試你期望的狀態中尚不存在的東西。這是因為訊息的有效載荷尚未從有線格式(byte [])
轉換為所需型別。換一種說法,它還沒有經過內容型別協商中描述的型別轉換過程。
因此,除非使用評估原始資料的SPeL表示式(例如,位元組陣列中第一個位元組的值),使用基於訊息頭的表示式
(例如condition =“headers [‘type’] ==‘dog’”)

5.3.4、使用輪詢的消費者
使用輪詢的使用者時,您可以根據需要輪詢PollableMessageSource。考慮以下輪詢消費者的示例:

public interface PolledConsumer {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

鑑於前面示例中的輪詢消費者,您可以按如下方式使用它

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure (throw an exception to reject the message);
            }
        }
    };
}

PollableMessageSource.poll()方法接受MessageHandler引數(通常是lambda表示式,如此處所示)。
如果收到併成功處理了訊息,則返回true。

與訊息驅動的使用者一樣,如果MessageHandler丟擲異常,則會將訊息釋出到錯誤通道,
如“[binder-error-channels]”中所述。
通常,poll()方法在MessageHandler退出時確認訊息。如果方法異常退出,則拒絕該訊息(不重新排隊)
您可以通過重寫該方法覆蓋該行為,如以下示例所示

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}

還有一個過載的poll方法,其定義如下:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

該型別是轉換提示,允許轉換傳入的訊息有效內容,如以下示例所示:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});