1. 程式人生 > >Spring Cloud Stream Kafka 特定分割槽的訊息始終由特定的消費者例項消費

Spring Cloud Stream Kafka 特定分割槽的訊息始終由特定的消費者例項消費

實驗目的:Kafka特定分割槽的訊息始終由消費者應用的特定例項消費,例如,分割槽1由例項索引0的例項消費,分割槽2由例項索引1的例項消費,分割槽3由例項索引2的例項消費。

專案介紹:專案分為1個生產者例項,3個消費者例項,生產者應用和消費者應用均為Spring Cloud Eureka客戶端專案。生產者例項將訊息傳送到Kafka Topic的3個分割槽中,消費者的3個例項分別按例項索引消費Kafka Topic的3個分割槽資料。即,例項索引0的例項消費分割槽0,例項索引1的例項消費分割槽1,例項索引2的例項消費分割槽2。

生產者專案結構:

生產者專案結構

引入Spring Cloud Stream依賴

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>

</dependency>



<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-stream-binder-kafka</artifactId>

</dependency>

應用類:
 

SpringCloudStreamKafkaProducerApplication.java

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.messaging.Source;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.Scheduled;



@EnableBinding(Source.class)

@EnableScheduling

@SpringBootApplication

public class SpringCloudStreamKafkaProducerApplication {



    @Autowired

    private Source source;



    public static void main(String[] args) {

        SpringApplication.run(SpringCloudStreamKafkaProducerApplication.class, args);

    }



    @Scheduled(fixedRate = 5000)

    public void handle1() {

        Person person = new Person();



        Long currentTimeMillis = System.currentTimeMillis();



        person.setId(Long.parseLong(currentTimeMillis.toString().substring(currentTimeMillis.toString().length() - 1)));

        person.setName("rock ");



        System.out.println("send a person..." + person);



        source.output().send(MessageBuilder.withPayload(person).build());

    }



    public static class Person {

        private Long id;

        private String name;



        public Long getId() {

            return id;

        }



        public void setId(Long id) {

            this.id = id;

        }



        public String getName() {

            return name;

        }



        public void setName(String name) {

            this.name = name;

        }



        @Override

        public String toString() {

            return "Person{" +

                    "id=" + id +

                    ", name='" + name + '\'' +

                    '}';

        }

    }

}

通道類:

CustomSource.java



public interface CustomSource {

    String OUTPUT1 = "output1";



    @Output(CustomSource.OUTPUT1)

    MessageChannel output1();



    String OUTPUT2 = "output2";



    @Output(CustomSource.OUTPUT2)

    MessageChannel output2();

}

配置類:
 

KafkaBindingConfig.java



@Configuration

public class KafkaBindingConfig {

    @Bean

    public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {

        return new CustomPartitionKeyExtractorClass();

    }



    @Bean

    public CustomPartitionSelectorClass customPartitionSelector() {

        return new CustomPartitionSelectorClass();

    }

}



CustomPartitionKeyExtractorClass.java



/**

 * <p>Description: 從Message中提取partition key的策略</p>

 */

public class CustomPartitionKeyExtractorClass implements PartitionKeyExtractorStrategy {

    @Override

    public Object extractKey(Message<?> message) {



        Object obj = message.getPayload();

        System.out.println("訊息載荷:" + obj);



        if (obj instanceof SpringCloudStreamKafkaProducerApplication.Person) {

            SpringCloudStreamKafkaProducerApplication.Person person = (SpringCloudStreamKafkaProducerApplication.Person) obj;

            return person.getId();

        }



        return null;

    }

}



CustomPartitionSelectorClass.java



/**

 * <p>Description: 決定message傳送到哪個partition的策略</p>

 */

public class CustomPartitionSelectorClass implements PartitionSelectorStrategy {

    @Override

    public int selectPartition(Object key, int partitionCount) {

        System.out.println("訊息載荷的key:" + key + " partitionCount:" + partitionCount);



        if (!ObjectUtils.isEmpty(key)) {

            Long id = (Long) key;



            return id.intValue() % partitionCount;

        }



        return 0;

    }

}

配置檔案:

application.properties



server.port=8881

spring.application.name=spring-cloud-stream-kafka-producer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# Kafka Binder Properties



# A list of brokers to which the Kafka binder connects.

# Default: localhost.

spring.cloud.stream.kafka.binder.brokers=localhost:9092



# If set to true, the binder creates new topics automatically.

# If set to false, the binder relies on the topics being already configured.

# In the latter case, if the topics do not exist, the binder fails to start.

# This setting is independent of the auto.topic.create.enable setting of the broker and does not influence it.

# If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.

# Default: true.

spring.cloud.stream.kafka.binder.autoCreateTopics=true



# If set to true, the binder creates new partitions if required.

# If set to false, the binder relies on the partition size of the topic being already configured.

# If the partition count of the target topic is smaller than the expected value, the binder fails to start.

# Default: false.

spring.cloud.stream.kafka.binder.autoAddPartitions=true



management.endpoints.web.exposure.include=bindings



# 通過兩個channel向同一個topic傳送訊息

spring.cloud.stream.bindings.output.destination=topic2

spring.cloud.stream.bindings.output.content-type=application/json



# 配置分割槽的輸出繫結

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id

# 此屬性開始若報無訂閱者錯誤,需開啟autoAddPartitions=true

# 輸出訊息分佈到3個分割槽

spring.cloud.stream.bindings.output.producer.partitionCount=3



# partition Key 提取器名稱,負責從訊息中提取分割槽key

spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor

# 自定義partition選擇器,負責根據分割槽key和partitionCount計算出將訊息釋出到哪個分割槽

spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector



# LOGGING

#logging.level.root=WARN

#logging.level.org.springframework.web=DEBUG

#logging.level.org.springframework=DEBUG

#logging.level.com.spring.cloud.stream.kafka.consumer.producer=DEBUG

logging.pattern.console=${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %4line %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}

消費者專案結構:

消費者專案結構

引入Spring Cloud Stream依賴

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>

</dependency>



<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-stream-binder-kafka</artifactId>

</dependency>

應用類:
 

KafkaConsumer1Application.java



import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.messaging.Sink;



@EnableBinding(Sink.class)

@SpringBootApplication

public class KafkaConsumer1Application {



    public static void main(String[] args) {

        SpringApplication.run(KafkaConsumer1Application.class, args);

    }



    @StreamListener(Sink.INPUT)

    public void handle(Person person) {

        System.out.println("handle Received: " + person);

    }



    public static class Person {

        private Long id;

        private String name;



        public Long getId() {

            return id;

        }



        public void setId(Long id) {

            this.id = id;

        }



        public String getName() {

            return name;

        }



        public void setName(String name) {

            this.name = name;

        }



        @Override

        public String toString() {

            return "Person{" +

                    "id=" + id +

                    ", name='" + name + '\'' +

                    '}';

        }

    }



}

配置檔案:

application-c1.properties



server.port=8871

spring.application.name=spring-cloud-stream-kafka-consumer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# input通道對應的設定

spring.cloud.stream.bindings.input.destination=topic2

spring.cloud.stream.bindings.input.content-type=application/json

spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer



# 同一個應用的例項數量和例項索引

spring.cloud.stream.instanceCount=3

spring.cloud.stream.instanceIndex=0



# 配置分割槽的輸入繫結

spring.cloud.stream.bindings.input.consumer.partitioned=true

spring.cloud.stream.bindings.input.consumer.concurrency=1



# autoRebalanceEnabled為true(default)時,Kafka負責在例項之間分佈partitions,不需要這些屬性:instanceCount,instanceIndex,partitioned

# autoRebalanceEnabled為false時,binder使用instanceCount and instanceIndex決定例項訂閱哪個partition

# partition數量至少要與例項數量一致

# binder代替Kafka計算partitions

# 這可以讓特定分割槽的訊息始終進入同一個例項

spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false



application-c2.properties



server.port=8872

spring.application.name=spring-cloud-stream-kafka-consumer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# input通道對應的設定

spring.cloud.stream.bindings.input.destination=topic2

spring.cloud.stream.bindings.input.content-type=application/json

spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer



# 同一個應用的例項數量和例項索引

spring.cloud.stream.instanceCount=3

spring.cloud.stream.instanceIndex=1



# 配置分割槽的輸入繫結

spring.cloud.stream.bindings.input.consumer.partitioned=true

spring.cloud.stream.bindings.input.consumer.concurrency=1



# autoRebalanceEnabled為true(default)時,Kafka負責在例項之間分佈partitions,不需要這些屬性:instanceCount,instanceIndex,partitioned

# autoRebalanceEnabled為false時,binder使用instanceCount and instanceIndex決定例項訂閱哪個partition

# partition數量至少要與例項數量一致

# binder代替Kafka計算partitions

# 這可以讓特定分割槽的訊息始終進入同一個例項

spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false



application-c3.properties



server.port=8873

spring.application.name=spring-cloud-stream-kafka-consumer

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/



# input通道對應的設定

spring.cloud.stream.bindings.input.destination=topic2

spring.cloud.stream.bindings.input.content-type=application/json

spring.cloud.stream.bindings.input.group=spring-cloud-stream-kafka-consumer



# 同一個應用的例項數量和例項索引

spring.cloud.stream.instanceCount=3

spring.cloud.stream.instanceIndex=2



# 配置分割槽的輸入繫結

spring.cloud.stream.bindings.input.consumer.partitioned=true

spring.cloud.stream.bindings.input.consumer.concurrency=1



# autoRebalanceEnabled為true(default)時,Kafka負責在例項之間分佈partitions,不需要這些屬性:instanceCount,instanceIndex,partitioned

# autoRebalanceEnabled為false時,binder使用instanceCount and instanceIndex決定例項訂閱哪個partition

# partition數量至少要與例項數量一致

# binder代替Kafka計算partitions

# 這可以讓特定分割槽的訊息始終進入同一個例項

spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=false

結果:分別啟動生產者,3個消費者例項,可以在輸出中看到,例項索引0的消費者消費的分割槽為0,例項索引1的消費者消費的分割槽為1,例項索引2的消費者消費的分割槽為2。