1. 程式人生 > >二十六、Springboot整合kafka

二十六、Springboot整合kafka

(一)新增依賴

	    <dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

(二)配置檔案

spring:
  kafka:
    bootstrap-servers: 192.168.10.130:9092,192.168.10.130:9093,192.168.10.130:9094
    #生產者 “值” 序列化(自定義ObjectSerializer類)
    producer:
      value-serializer: org.pc.serializer.ObjectSerializer
    consumer:
          group-id: cluster-group

          #消費者 “值” 反序列化(自定義ObjectDeserializer類)
          value-deserializer: org.pc.deserializer.ObjectDeserializer

(三)註冊KafkaTemplate   Springboot自動裝載了KafkaTemplate,無需手動註冊

(四)使用Kafka生產者客戶端生產訊息

@RestController
public class KafkaController {
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    public KafkaController(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @GetMapping("/message/send")
    public String sendMessage(@RequestParam String message){
        //向kafka叢集中的"cluster-topic"傳送訊息
        kafkaTemplate.send("cluster-topic", 0, "message", message);
        return message;
    }
}

(五)使用Kafka消費者客戶端消費訊息

/**
 * 消費者監聽器
 * 作用:只要加上@KafkaListener,設定好topics,就可以訂閱該主題產生的訊息(相當於消費者客戶端)
 */
@Component
public class ConsumerListener {
    @KafkaListener(topics = "cluster-topic")
    public void consume(String message){
        System.out.println(message);
    }
}

(六)出現問題及其解決 1、當傳送Object型別訊息時,報錯"Can’t convert value of class org.pc.entity.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer"?

原因解析:在生產訊息時,Kafka的Java客戶端預設使用的是StringSerializer序列化類,但是他只能序列化String型別物件,要是其他型別物件,就會無法序列化。同理,在消費訊息時,需要反序列化,也會遇到這個問題。

解決辦法:自定義Kafka“生產者客戶端的序列化類”和“消費者客戶端的反序列化類”,目的實現對Object物件的序列化與反序列化。

(1)生產者客戶端 “值” 的序列化:

/**
 * Object 序列化
 * @author 鹹魚
 * @date 2018/10/14 8:43
 */
public class ObjectSerializer implements Serializer<Object> {

    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, Object data) {
        byte[] dataArray = null;
        try {
            //1、建立OutputStream物件
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            //2、建立OutputStream的包裝物件ObjectOutputStream,PS:物件將寫到OutputStream流中
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
            //3、將物件寫到OutputStream流中
            objectOutputStream.writeObject(data);
            //4、將OutputStream流轉換成位元組陣列
            dataArray = outputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return dataArray;
    }

    @Override
    public void close() {

    }
}

(2)消費者客戶端 “值” 的反序列化:

/**
 * Object 反序列化
 * @author 鹹魚
 * @date 2018/10/14 9:13
 */
public class ObjectDeserializer implements Deserializer<Object> {

    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        Object object = null;

        try {
            ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            object = objectInputStream.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return object;
    }

    @Override
    public void close() {

    }
}

(3)修改配置檔案

spring:
  kafka:
    bootstrap-servers: 192.168.10.130:9092,192.168.10.130:9093,192.168.10.130:9094
    #生產者 “值” 序列化
    producer:
      value-serializer: org.pc.serializer.ObjectSerializer
    consumer:
          group-id: cluster-group

          #消費者 “值” 反序列化
          value-deserializer: org.pc.deserializer.ObjectDeserializer

(4)測試 生產訊息:

@PostMapping("/user")
    public User saveUser(@RequestBody User user){
        kafkaTemplate.send("object-topic", 0, "message", user);
        return user;
    }

消費訊息:

@KafkaListener(topics = "object-topic")
    public void consume(Object object){
        System.out.println(object);
    }

2、報錯如下:javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=gx-test-20170629?

原因解析:如果使用了ConcurrentMessageListenerContainer 的實現,並且配置了併發度大於1,同時配置了kafka的 client.id屬性則會出現上述問題,而當你配置為1的時候不會出現上述log。

解決辦法:不配置client.id這一項,kakfa中會預設為多個執行緒生成id