Kafka學習(二)消費者
阿新 • • 發佈:2018-11-08
消費者:
(1)配置:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String brokers; @Value("${spring.kafka.consumer.group-id}") private String group; @Value("${spring.kafka.consumer.key-deserializer}") private String keyType; @Value("${spring.kafka.consumer.value-deserializer}") private String valueType; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.getContainerProperties().setPollTimeout(4000); return factory; } @Bean public KafkaListeners kafkaListeners() { return new KafkaListeners() { @Override public boolean equals(Object obj) { return false; } @Override public int hashCode() { return 0; } @Override public String toString() { return null; } @Override public Class<? extends Annotation> annotationType() { return null; } @Override public KafkaListener[] value() { return new KafkaListener[0]; } }; } public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyType); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueType); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<String, String>(properties); } }
(2)消費:
@Component public class KafkaConsumer { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); @Autowired private ObjectMapper objectMapper; @Autowired private RestTemplate restTemplate; @Autowired private KafkaTemplate kafkaTemplate; @KafkaListener(topics = {KafkaMessageConfig.BUSINESS_SERVICE_TOPIC}) public void processMessage(ConsumerRecord<String, String> consumer) { try { String topic = ""; String key = ""; String message = ""; if (consumer.topic() != null) { topic = consumer.topic(); } if (consumer.key() != null) { key = consumer.key(); } if (consumer.value() != null) { message = consumer.value(); } if (topic == null || "".equals(topic.trim()) || " ".equals(topic)) { log.warn("[processMessage] invalid topic {}", topic); return; } if (key == null || "".equals(key.trim()) || " ".equals(key)) { log.warn("[processMessage] invalid key {}", key); return; } if (!new JsonValidator().validate(message)) { log.warn("[processMessage] invalid message string {}", message); return; } Map<String, Object> body = JsonUtil.json2Bean(message, Map.class); Map<String, Object> params = new HashMap<>(); if (body == null || body.isEmpty() || !body.containsKey("OperationType")) { log.warn("[processMessage] invalid message string {}", message); return; } }