1. 程式人生 > >[SpringCloud-SpringCloudStream] 簡單通過Spring-cloud-stream元件使用kafka

[SpringCloud-SpringCloudStream] 簡單通過Spring-cloud-stream元件使用kafka

1.消費者

一.pom依賴

        <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
			<version>1.2.1.RELEASE</version>
		</dependency>
		也可以根據springcloud適配版本,省略版本號

二.Kafka消費者配置

#kafka對應的地址
spring.cloud.stream.kafka.binder.brokers = 192.168.xx.xxx:9092
#kafka的zookeeper對應的地址
spring.cloud.stream.kafka.binder.zkNodes = 192.168.xx.xxx:2181
#監聽kafka的topic
spring.cloud.stream.bindings.xxxxxx.destination = topic-test
#消費者分組
spring.cloud.stream.bindings.xxxxxx.group = test-group
#接收原始訊息
spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw

其中“xxxxxx”是自定義欄位,需要和第三步中消費者程式碼的String INPUT = "xxxxxx";保持一致;

三.消費者程式碼

public interface MqSinkI {

    String INPUT = "xxxxxx";

    /**
     * 消費者介面
     *
     * @return org.springframework.messaging.SubscribableChannel 介面物件
     */
    @Input(MqSinkI.INPUT)
    SubscribableChannel input();

}


@EnableBinding(value = {MqSinkI.class})
public class MqSinkReceiver {

    @Autowired
    MqListener mqListener;

    @StreamListener(MqSinkI.INPUT)
    public void messageListen(JSONObject jsonParam) {
        System.out.println("收到資訊:" + jsonParam.toString());
        //處理請求的類,對訊息進行處理
        mqListener.listen(jsonParam);
    }
}

@Component
public class MqListener {

    public void listen(JSONObject jsonParam) {
        System.out.println("收到:" + jsonParam);
    }
}

四.小結

  • 注意配置項裡面的xxx欄位要和程式碼定義的字串常量保持一致
  • MqListener 類可以可以省去,處理邏輯直接可以寫在MqSinkReceiver 類的messageListen裡面;
  • 配置中的地址和主題都可以配置多個
  • spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw配置項
    可能會影響訊息的接收格式,如果不新增這條配置,接收引擎的訊息可能會有問題,如果其他生產者按照第2點的方式生產訊息,則可以不使用這條配置。

2.生產者

一.pom依賴

與消費者一樣

二.Kafka生產者配置

#kafka對應的地址
spring.cloud.stream.kafka.binder.brokers=192.168.11.199:9092
#kafka的zookeeper對應的地址
spring.cloud.stream.kafka.binder.zkNodes=192.168.11.199:2181
spring.cloud.stream.bindings.oooooooo.destination=topic-test
spring.cloud.stream.bindings.oooooooo.content-type=application/json

三.生產者程式碼

public interface MySource {

    String OUTPUT = "oooooooo";

    String OUTPUT1 = "myOutputTest1";


    @Output(MySource.OUTPUT)
    MessageChannel output();
}


@EnableBinding(MySource.class)
public class SendService {

    @Autowired
    private MySource mySource;


    public void sendMessage(String msg) {
        try {
            mySource.output().send(MessageBuilder.withPayload(msg).build());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@RestController
public class ProducerController {

    @Autowired
    private SendService service;

    @RequestMapping(value = "/kafka")
    public void send() {
        while (true) {
            //傳送訊息到指定topic
            JSONObject obj = new JSONObject();
            obj.put("time", (new Date()).toString());
            System.out.println("生產者傳送:" + obj.toString());
            service.sendMessage(obj.toString());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
            }
        }
    }
}

3.原始碼