1. 程式人生 > >SpringCloudStream最全教程,包括配置檔案描述

SpringCloudStream最全教程,包括配置檔案描述

Spring Cloud Stream 知識整理

概念 使用方法


概念

1. 釋出/訂閱

釋出/訂閱模型

簡單的講就是一種生產者,消費者模式。釋出者是生產,將輸出釋出到資料中心,訂閱者是消費者,訂閱自己感興趣的資料。當有資料到達資料中心時,就把資料傳送給對應的訂閱者。

2. 消費組

直觀的理解就是一群消費者一起處理訊息。需要注意的是:每個傳送到消費組的資料,僅由消費組中的一個消費者處理。

3. 分割槽

類比於消費組,分割槽是將資料分割槽。舉例:某應用有多個例項,都繫結到同一個資料中心,也就是不同例項都將資料釋出到同一個資料中心。分割槽就是將資料中心的資料再細分成不同的區。為什麼需要分割槽?因為即使是同一個應用,不同例項釋出的資料型別可能不同,也希望這些資料由不同的消費者處理。這就需要,消費者可以僅訂閱一個數據中心的部分資料。這就需要分割槽這個東西了。

Spring Cloud Stream簡介

1. 應用模型

Spring Cloud Stream應用由第三方的中介軟體組成。應用間的通訊通過輸入通道(input channel)和輸出通道(output channel)完成。這些通道是有Spring Cloud Stream 注入的。而通道與外部的代理(可以理解為上文所說的資料中心)的連線又是通過Binder實現的。

Spring Cloud Stream 應用模型

上圖就是Spring Cloud Stream的應用模型。

1.1 可獨立執行的jar

Spring Cloud Stream應用可以直接在IDE執行。這樣會很方便測試。但在生產環境下,這是不適合的。Spring Boot為maven和Gradle提供了打包成可執行jar的工具,你可以使用這個工具將Spring Cloud Stream應用打包。

2. 抽象的Binder

Binder可以理解為提供了Middleware操作方法的類。Spring Cloud 提供了Binder抽象介面以及KafKaRabbit MQ的Binder的實現。

使用Spring Cloud Stream

1. 快速開始

這裡先放出前面的應用模型圖

應用模型圖

下面例子使用的Middleware是Kafka,版本是kafka_2.11-1.0.0。Kafka使用的是預設配置,也就是從Kafka官網下載好後直接開啟,不更改任何配置。

關於pom.xml中依賴的專案的版本問題,最好不該成別的版本,因為很大可能導致版本衝突。

1.1 pom.xml

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.8.RELEASE</version>
	</parent>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-stream-dependencies</artifactId>
				<version>Ditmars.RELEASE</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>


	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<exclusions>
				<exclusion>
					<groupId>ch.qos.logback</groupId>
					<artifactId>logback-classic</artifactId>
				</exclusion>
				<exclusion>
					<groupId>ch.qos.logback</groupId>
					<artifactId>logback-core</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
		</dependency>
	</dependencies>

需要注意的是:官網上的例子是沒有下面配置的

			<exclusions>
				<exclusion>
					<groupId>ch.qos.logback</groupId>
					<artifactId>logback-classic</artifactId>
				</exclusion>
				<exclusion>
					<groupId>ch.qos.logback</groupId>
					<artifactId>logback-core</artifactId>
				</exclusion>
			</exclusions>

但是在本人電腦上如果不加上上面那段配置就是報錯,讀者可以按照個人情況選擇加不加。

簡單說明一下以上配置

  1. <parent>...</parent>:這段代表繼承spring-boot-starter-parent的配置。因為Spring Cloud Stream 依賴Spring Boot的自動配置,所以需要加上這段。
  2. <dependencyManagement>...</dependencyManagement>:這段是引入spring-cloud-stream-dependencies.pom.xml,該配置檔案裡含有Spring Cloud Stream 專案需要使用的jar包的資訊(包名加版本號)
  3. <dependencies>...</dependencies>依賴兩個starter

1.2 App.java

@EnableBinding(value = { Processor.class })
@SpringBootApplication
public class App {

	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(App.class);
		// 註冊處理函式
		System.out.println("註冊結果:" + setHander(context));
		// 傳送訊息
		System.out.println("傳送結果:" + write(context));
	}

	// 傳送訊息
	public static boolean write(ConfigurableApplicationContext context) {
		Service service = context.getBean(Service.class);
		return service.write("狗子在嗎?");
	}

	// 註冊接收到訊息時的處理函式
	public static boolean setHander(ConfigurableApplicationContext context) {
		Service service = context.getBean(Service.class);
		return service.subscribe(result -> {
			System.out.print("狗子收到訊息:" + result.getPayload());
		});
	}
}

上面使用了兩個註解:@EnableBinding 和 @SpringBootApplication。@SpringBootApplication 就不說了。@EnableBinding 註解接收一個引數,引數型別是class。上面程式碼中,傳入的引數是“Processor.class”,這是一個介面,定義了兩個channel,分別是input和output。看名稱就知道,一個是輸出通道(input channel),一個是輸出通道(output channel)。“@EnableBinding(value = { Processor.class })”這整段代表建立Processor定義的通道,並將通道和Binder繫結。

Porcessor是Spring Cloud Stream為方便使用而預先定義好的,除了Processor還有Sink和Source,這些介面定義了一些通道(channel),需要時,直接使用就好。我們也能自己定義通道(channel),如何定義下文會講。

App類中的main方法呼叫了SpringApplication.run,接著呼叫了write和setHandler方法。方法很簡單,上文有註釋,不再贅述。

1.3 Service.java

@Component
public class Service {

	@Autowired
	private Processor processor;
	
	public boolean write(String data) {
		return processor.output().send(MessageBuilder.withPayload(data).build());
	}
	
	public boolean subscribe(MessageHandler handler) {
		return processor.input().subscribe(handler);
	}
}

這是一個service類,封裝了一些對通道的操作。

需要注意的是這段程式碼:

	@Autowired
	private Processor processor;

前面說過,Processor是一個定義了輸入輸出通道的介面,並沒有具體實現。Spring Cloud Stream會幫我們自動實現它。我們只需要獲取它,並使用它。

接著看

processor.output().send(MessageBuilder.withPayload(data).build());

先是呼叫output()方法獲取輸出通道物件,接著呼叫send方法傳送資料。send方法接收一個Message物件,這個物件不能直接new,需要使用MessageBuilder獲取。

1.4 application.properties

spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.output.destination=test

上面配置了目的地,類比於Kafka的Topic和RabbitMQ的佇列的概念。

配置格式如下:

spring.cloud.stream.bindings.<channelName>.<key>=value

channelName就是管道名,key就是對應屬性,這裡是destination,代表目的地。

管道名,key的其他可選值下文會講,這裡不要強求全部弄懂,接著看就好。

1.4 總結

上面就是完整的例子了。對比前面給出的應用模型圖,上面的程式碼和配置檔案定義了Application Core(程式碼中的處理函式,傳送訊息的函式等等),建立了通道並和Binder繫結(@EnableBinding(value = { Processor.class }))。Middleware就是本節一開始說的Kafka。整個流程大概如下:

  1. 開啟Middleware(Kafka)
  2. 建立通道並與Binder繫結(@EnableBinding)
  3. 編寫操作通道的程式碼
  4. 在配置檔案上配置目的地,組,Middleware的地址,埠等等

使用Spring Cloud Stream

1 宣告和繫結通道(channel)

1.1 宣告通道

Spring Cloud Stream 可以有任意數量的通道。宣告通道的方式很簡單。下面先給出之前說過的Sink,Source,Processor介面的原始碼:

public interface Sink {

	String INPUT = "input";

	@Input(Sink.INPUT)
	SubscribableChannel input();

}

public interface Source {

	String OUTPUT = "output";

	@Output(Source.OUTPUT)
	MessageChannel output();

}

public interface Processor extends Source, Sink {

}

簡單吧,就是使用了@Input和@Output註解了方法。其中@Input註解的方法返回的是SubscribableChannel,@Output註解的方法返回的是MessageChannel。

宣告通道(channel)的方法就是使用@Input和@Output註解方法。你想要多少通道就註解多少方法。

給通道命名

預設情況下,通道的名稱就是註解的方法的名稱,例如:

@Input
public SubscribableChannel yyy();

那麼該通道的名稱就是yyy。也能夠自己定義通道名稱。只需要給@Input和@Output註解傳入String型別引數就可以了,傳入的引數就是該通道了名稱。例如:

@Input("zzz")
public SubscribableChannel yyy();

通道的名稱就變成了zzz。

1.2 建立和繫結通道

只需要使用@EnableBinding就能建立和繫結通道(channel)。

@EnableBinding(value={Sink.class,Source.class})

@EnableBinding註解接收的引數就是使用@Input或者@Output註解聲明瞭通道(channel)的介面。Spring Cloud Stream會自動實現這些介面。

上文中說過,@Input和@Output註解的方法有相應的返回值,這些返回值就是對應的通道(channel)物件。要使用通道(channel)時,就只要獲取到Spring Cloud Stream對這些介面的實現,再呼叫註解的方法獲取到通道(channel)物件進行操作就可以了。如何獲取介面的實現下文會講。

繫結通道(channel)是指將通道(channel)和Binder進行繫結。因為Middleware不只一種,例如有Kafka,RabbitMQ。不同的Middleware有不同的Binder實現,通道(channel)與Middleware連線需要經過Binder,所以通道(channel)要與明確的Binder繫結。

如果類路徑下只有一種Binder,Spring Cloud Stream會找到並繫結它,不需要我們進行配置。如果有多個就需要我們明確配置了,配置方式下文會講。這裡只需要知道@EnableBinding能幫我們自動實現介面,建立通道和實現通道與Binder的繫結就可以了。

獲取綁定了的通道

使用了@EnableBinding註解後,Spring Cloud Stream 就會自動幫我們實現介面。那麼,可以通過Spring支援的任何一種方式獲取介面的實現,例如自動注入,getBean等方式,下面給出官方例子:

@Component
public class SendingBean {

    private Source source;

    @Autowired
    public SendingBean(Source source) {
        this.source = source;
    }

    public void sayHello(String name) {
         source.output().send(MessageBuilder.withPayload(name).build());
    }
}

也能夠直接注入通道(channel)

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         output.send(MessageBuilder.withPayload(name).build());
    }
}

如果你給通道命名了,需要使用@Qualifier註解指明通道名稱

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(@Qualifier("customOutput") MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         this.output.send(MessageBuilder.withPayload(name).build());
    }
}

2 生產和消費訊息

2.1 生產訊息

一種方式是呼叫通道(channel)的sned方法釋出訊息。還有就是使用Spring Intergration的方式生產資料

@EnableBinding(Source.class)
public class TimerSource {

  @Value("${format}")
  private String format;

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
  }
}

Spring Cloud Stream是繼承Spring Intergration的,所有Spring Cloud Stream 天然支援Spring Intergration的東西。

2.2 消費訊息

一種方式是前面快速開始中的那樣註冊處理函式,這裡不再贅述,下面將是使用@StreamListener註解對訊息進行處理

使用@StreamListener的例子

@EnableBinding(value = { Processor.class })
@SpringBootApplication
public class App {

	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(App.class);
		// 傳送訊息
		System.out.println("傳送結果:" + write(context));
	}

	@StreamListener(Sink.INPUT)
	public void handler(String message) {
		System.out.print("狗子收到訊息:" + message);
	}

	// 傳送訊息
	public static boolean write(ConfigurableApplicationContext context) {
		Service service = context.getBean(Service.class);
		return service.write("狗子在嗎?");
	}
}

這是快速開始的例子,在這將下面的程式碼去掉,換成@StreamListener

	public static boolean setHander(ConfigurableApplicationContext context) {
		Service service = context.getBean(Service.class);
		return service.subscribe(result -> {
			System.out.print("狗子收到訊息:" + result.getPayload());
		});
	}

@StreamListener接收的引數是要處理的通道(channel)的名,所註解的方法就是處理從通道獲取到的資料的方法。方法的引數就是獲取到的資料。

訊息是帶有Header的,類似Http的headler,上面有contentType屬性指明訊息型別。如果contentType是application/json,那麼@Streamlistener會自動將資料轉化成@StreamListener註解的方法的引數的型別。

可以是@Header,@Headers註解獲取訊息的Header

@StreamListener(target=Sink.INPUT)
	public void handler1(Message message,@Header(name="contentType") Object header) {
		System.out.print("狗子收到message訊息:" + message.getMessage());
		System.out.print("訊息header:" + header);
	}

用法如上,使用@Header或者@Headers註解方法的引數,指明讓Spring Cloud Stream將訊息的Header傳入對應的引數。

@Header和@Headers的區別就是一個是獲取單個屬性,需要指明哪個屬性,一個是獲取全部屬性。

@StreamListener(target=Sink.INPUT)
	public void handler1(Message message,@Headers Map<String,Object> header) {
		System.out.print("狗子收到message訊息:" + message.getMessage());
		System.out.print("訊息header:" + header);
	}

實際上還有一些註解是@PayLoad和@PayLoads,看名字就知道是獲取訊息內容的,具體用法和注意事項Spring Cloud Stream 官方文件上沒講,這部分內容以後補充。

注意:如果@StreamListener註解的方法有返回值,那麼必須使用@SendTo註解指明返回的值寫入哪個通道

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

使用@StreamListener將訊息分發給多個方法

若想使用訊息分發的功能,方法必須先滿足一下條件:

  • 沒有返回值
  • 方法是單獨的訊息處理方法(原文:it must be an individual message handling method (reactive API methods are not supported))

分發的條件在註解的“condition”屬性中指明,而且條件是由SpEL表示式編寫的。所有匹配條件的處理函式將會在相同的執行緒中無固定順序的呼叫。

下面給出一個例子(由快速開始中例子修改而來):

下面這個例子中,pom.xml,和application.properties與快速開始的一樣。

//先定義兩個DTO
public class Message {
	private String message;
	private Integer all;

	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}

	public Integer getAll() {
		return all;
	}

	public void setAll(Integer all) {
		this.all = all;
	}

}

public class Error {

	private String error;

	public String getError() {
		return error;
	}

	public void setError(String error) {
		this.error = error;
	}
	
}

接著是封裝了的通道(channel)操作的Service。與快速開始的例子不同的是,這個裡建立訊息時設定的Header的“contentType”屬性,值為訊息攜帶的資料的Class的SimpleName。

@Component
public class Service {

	@Autowired
	private Processor processor;

	public boolean write(Object data) {
		return processor.output().send(
				MessageBuilder.withPayload(data).setHeader("contentType", data.getClass().getSimpleName()).build());
	}
}

最後是App類。這類將釋出了兩次訊息,分別是Message型別的和Error型別的。並且使用@StreamListener註解了三個方法,都設定了condition屬性指明分發條件。

@EnableBinding(value = { Processor.class })
@SpringBootApplication
public class App {

	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(App.class);
		// 傳送訊息
		Message message = new Message();
		message.setAll(200);
		message.setMessage("狗子在嗎?");

		Error error = new Error();
		error.setError("錯誤呼喚!");
		write(context, message);
		write(context, error);
	}

	@StreamListener(target = Sink.INPUT, condition = "headers['contentType']=='Message'")
	public void handler1(@Payload Message message, @Header("contentType") String header) {
		System.out.println("狗子收到message訊息1:" + message.getMessage());
	}

	@StreamListener(target = Sink.INPUT, condition = "headers['contentType']=='Error'")
	public void handler2(Error message) {
		System.out.print("狗子收到error訊息2:" + message.getError());
	}

	@StreamListener(target = Sink.INPUT, condition = "headers['contentType']=='Message'")
	public void handler3(@Payload Message message, @Header("contentType") String header) {
		System.out.println("狗子收到message訊息3:" + message.getMessage());
	}

	// 傳送訊息
	public static boolean write(ConfigurableApplicationContext context, Object data) {
		Service service = context.getBean(Service.class);
		return service.write(data);
	}
}

輸出結果:

狗子收到message訊息1:狗子在嗎?
狗子收到message訊息3:狗子在嗎?
狗子收到error訊息2:錯誤呼喚!

可以看到匹配了“contentType=Message”的兩個方法都執行了,匹配了“contentType=error”的方法也執行了。

這裡我再補充一點我使用時遇到的問題

如果我把Header設定一個屬性“type=XXX”,但獲取到訊息的時候,Header上並沒有這個屬性。簡單嘗試了一些,發現只能修改現有屬性(例如contentType),不能新增新屬性。

2.3 聚合

2.3.1 使用限制

Spring Cloud Stream 支援聚合多個應用的功能。這個功能可以直接連線多個應用的輸入,輸出通道,避免通過代理(指Kafka,RabbitMQ這些Middleware)交換訊息時帶來的額外耗費。到1.0版的Spring Cloud Stream為止,聚合功能僅支援下列應用:

  1. 只有單個輸出通道,並且命名為output的應用(就是Source)
  2. 只有單個輸入通道,並且命名為input的應用(就是Sink)
  3. 只有一個輸出通道和一個輸入通道並且命名為output和input的應用(就是Processor)

以上是官方文件原話,個人覺得很雞肋的功能,也許我用得少吧。

具備以上特徵的應用就可以使用Spring Cloud Stream的聚合功能將多個應該連線成一串互相連線的應用。

這裡還有幾個限制,起始的應用必須是Source或者Processor,結束的應用必須是Sink或者Processor。中間的應用必須是Processor,不過可以有任意數量的Processor。(Soruce,Sink,Processor就是指具備上面所說特徵的應用)

2.3.2 例子

下面給出官方例子,先說明幾個注意點:

  1. 下面例子中有三個應用分別是Source,Sink,Processor,這三個應用可以分佈在不同專案中,也能在相同專案中。需要注意的是,如果在相同專案中,應該要處於不同的包中,如果同個包,多個@SpringBootApplication註解會導致報錯
  2. 使用@Transformer註解需要指明inputChannel和outputChannel屬性。官方文件的例子上是沒有指明的,但我執行的時候如果不指明就不能將多個應用連在一起。
  3. 不要使用eclipse中的Spring Boot應用的外掛執行,使用外掛執行會報注意點1的錯誤,原因是什麼不清楚。
//Source
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {
	
	@InboundChannelAdapter(value = Source.OUTPUT)
	public String timerMessageSource() {
		return new SimpleDateFormat().format(new Date());
	}
}
//Processor
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {
	
	@Transformer(inputChannel=Sink.INPUT,outputChannel=Source.OUTPUT)
	public String loggerSink(String payload) {
		return payload.toUpperCase();
	}
}
//Sink
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

	@StreamListener(Sink.INPUT)
	public void loggerSink(Object payload) {
		System.out.println("Received: " + payload);
	}
}

上面是三個應用,下面是將三個應用連線起來的程式碼。

@SpringBootApplication
public class App {

	public static void main(String[] args) {
		new AggregateApplicationBuilder().from(SourceApplication.class).args("--fixedDelay=5000")
				.via(ProcessorApplication.class).to(SinkApplication.class).args("--debug=true").run(args);
	}
}

程式碼很簡單,就是使用AggregateApplicationBuilder將三個應用連線起來。.args("XXX")這段程式碼的作用就是為對應的應用傳遞執行時引數。

2.3.3 不同連線情況下的Binder繫結

由於限制多多,可以窮舉出所有的可能連線,下面給出不同連線與Binder的繫結情況:

  1. 如果以Source應用開始並且以Sink應用結束,那麼應用間的連線是直接進行的,不會經過代理(指Kafka,RabbitMQ這些Middleware),也就不會與Binder繫結。例如上面的例子,你把使用的Middleware關閉,例如我使用的是Kafka,我把Kafka關了,應用也能跑起來。
  2. 如果以Processor應用開始,那麼這個應用的input通道就是這一串一樣的input通道,這種情況下,會觸發input通道與Binder的繫結。
  3. 如果以Processor應用結束,那麼這個應用的output通道就是這串硬硬的output通道,會觸發output通道與Binder的繫結。

2.3.4 配置聚合的應用

Spring Cloud Stream 支援為聚合在一起的多個應用中的一個應用傳遞引數。

為應用命名namespace後,就可以通過命令列,環境變數等方式給應用傳遞引數。

public static void main(String[] args) {
		new AggregateApplicationBuilder()
		.from(SourceApplication.class).namespace("from").args("--fixedDelay=20000")
		.via(ProcessorApplication.class).namespace("via")
		.to(SinkApplication.class).namespace("to").args("--debug=true").run(args);
	}

這端程式碼和前面的例子沒太大差別,只是多了.namespace(),這段程式碼就是為應用設定namesapce。

接著是聚合在一起的應用的程式碼:

	//獲取傳入的引數
	@Value("${fixedDelay:null}")
	private String args;
	
	@InboundChannelAdapter(value = Source.OUTPUT)
	public String timerMessageSource() {
      	//輸出引數
		System.out.println("Source get args:"+args);
		return new SimpleDateFormat().format(new Date());
	}

這裡只給出一個,其他類似,都是加了獲取引數和輸出引數的程式碼。

接著打包後以下列命令執行:

java -jar stream-aggregation.jar

輸出:

Source get args:20000
Processor get args:null
Sink get args:null
Received: 17-12-14 下午5:43

可以看到,因為Processor是沒有fixedDelay引數的,所有輸出null

以下列命令執行:

java -jar stream.jar via --fixedDelay=200

輸出:

Source get args:20000
Processor get args:200
Sink get args:null
Received: 17-12-14 下午5:46

可以看到,輸出為200,就是我們傳入的引數,而Sink和Source的輸出沒變,也就是沒改變它們的引數

總結一下:

  1. 在聚合時候設定namespace
  2. 在命令列或者環境變數等方式使用namespace為指定應用傳遞引數

Binder以及配置

應用模型圖

這裡再放出應用模型圖。Binder簡單的理解就是封裝了對訊息系統(kafka,rabbitMQ)的操作。可以使用開發者簡單的配置就能使用訊息系統的釋出/訂閱,點對點傳輸,分組,分割槽等等功能。是開發者開放時能忽略對訊息系統操作的細節。當然,這些元件的設計一般是抽象出一個介面,然後對不同的訊息系統有不同的實現,這些東西這裡不講,只講怎麼用。

1 Binder實現類的檢測

1.1 單個Binder實現類

如果在類路徑上只有一個Binder的實現類(例如你在maven專案中,只添加了kafka的Binder的實現的依賴),那麼Spring Cloud Stream會預設使用這個實現類,所有的通道(Channel)都會繫結這個Binder。就像前面的例子那樣,你幾乎感覺不到Binder的存在,你只需要配置一下通道(Channel)的目的地(destination),分組(group),分割槽(partition)等資訊就可以使用。例如快速開始的例子中就僅僅配置了輸入,輸出通道的目的地

spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.output.destination=test

1.2 多個Binder實現類

如果有多個Binder實現類,那麼就必須指明哪個通道(Channel)繫結哪個Binder。配置的方式就是在application.peoperties或者application.yaml配置檔案上新增一下內容:

spring.cloud.stream.bindings.通道名稱.binder=Binder名稱

這樣就能指明什麼通道繫結哪個Binder了。

當然,你也可以配置預設的Binder

spring.cloud.stream.defaultBinder=Binder名稱

關於Binder的名稱

在每個Binder實現的jar包的META-INF目錄下都會有一個spring.binders檔案。該檔案是一個簡單的單屬性檔案,例如rabbitMQ的Binder的實現的spring.binders檔案的內容如下:

rabbit:\org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

前面的key部分(這裡是rabbit)就是Binder的名稱。也就是rabbitMQ的Binder的名稱就是rabbit。實際上現在也就只有兩種Binder的實現,一個是rabbitMQ的一個是kafka的,kafka的Binder的名稱就是kafka。

2 可選配置

可以通過Spring Boot的任意配置機制來對Spring Cloud Stream應用進行配置,例如應用引數(application argument),環境變數(environment variable)以及YAML 或者 properties檔案。

2.1 對應用的配置

spring.cloud.stream.instanceCount

這個是配置應用例項的數量。如果使用kafka,必須設定分割槽。預設值為1

spring.cloud.stream.instanceIndex

例項的編號,編號從0開始。

spring.cloud.stream.dynamicDesinations

設定一列目的地用以動態繫結。如果設定了,只有列表中的目的地能被繫結。預設值為空。

spring.cloud.stream.defaultBinder

設定的預設的Binder,這個前面說過,不再贅述。預設值為空。

spring.cloud.stream.overrideCloudConnectors

預設值為false。當值為false時,Binder會檢查並選擇合適的bound Service來建立連線。當設為true的時候,Binder會按照Spring Cloud Stream配置檔案來選擇bound Service。這個配置通常是在需要連線多個訊息系統的時候用到。

2.2 連線(Binding)的配置

這類配置的格式如下:

spring.cloud.stream.bindings.<channelName>.<property>=<value>

意思就是配置名為channelName的通道的property屬性的值為value。

為了避免重複配置,Spring Cloud Stream 也支援對全部通道(channel)進行設定。配置預設屬性的格式如下:

spring.cloud.stream.default.<property>=<value>

2.2.1 通用的配置

一下的配置屬性都帶有“spring.cloud.stream.bindings.<channelName>”字首,為方便文字排版,省略字首。

destination

通道(channel)與訊息系統連線的目的地(若訊息系統是RabbitMQ,目的地(destination)就是指exchange,訊息系統是Kafka,那麼就是指topic)。

可以連線多個目的地。要想連線多個目的地,只需要用“,”將多個目的地分開即可。例如:

spring.cloud.stream.channelName.destinaction=destinaction1,destinaction2

group

配置通道的消費者組。僅應用於輸入通道。

預設值為null

補充:一個channel可以連線多個destination,同一個group內的channel連線的destination可以不同。

如果一個group內的channel連線了A,B,C三個destination。那麼A,B,C這個三個destination的訊息都會拷貝一份發給這個group,並且選擇這個group中channel消費這個訊息。例如,這個group中的a,b兩個channel連線並且只連線了destination A,channel c連線且只連線了destination B,那麼會在a,b中選一個來處理來自A的訊息,c不在選擇的範圍內。

如果有兩個group都連線了destination A,那麼A的訊息會拷貝兩份分別發給這兩個group。

contentType

通道(channel)承載的內容的型別。

預設值為null。

binder

這個在前面“多個Binder實現類”部分講了。