1. 程式人生 > >Spring註解方式整合Kafka(spring-kafka的使用)

Spring註解方式整合Kafka(spring-kafka的使用)

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;

/**
 * 
 * Description:Kafka生產者
 * Date:		2017年7月11日
 * 
 * 依賴:
 		<dependency>
		    <groupId>org.springframework.kafka</groupId>
		    <artifactId>spring-kafka</artifactId>
		    <version>1.0.5.RELEASE</version>
		</dependency>
 * 
 * 使用案例:
   	@Resource
	private KafkaTemplate kafkaTemplate;
	
	呼叫方法傳送資料:
	kafkaTemplate.send(topic, msg);
 * 
 */
@Configuration
@EnableKafka
public class KafkaProducer {

	/**
	 * 
	 * Description:獲取配置
	 * Date:		2017年7月11日
	 * @author 		shaqf
	 */
	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<>();
		// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));
		props.put(ProducerConfig.RETRIES_CONFIG, 0);
		props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
		props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return props;
	}

	/** 獲取工廠 */
	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<>(producerConfigs());
	}

	/** 註冊例項 */
	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}


import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;


/**
 * 
 * Description:Kafka消費者
 * Date:		2017年7月11日
 * 
 * 依賴:
 		<dependency>
		    <groupId>org.springframework.kafka</groupId>
		    <artifactId>spring-kafka</artifactId>
		    <version>1.0.5.RELEASE</version>
		</dependency>
 * 
 * 使用案例:
    @KafkaListener(topics = { "taskCmd" })
	public void taskCmd(ConsumerRecord<?, ?> record) {
		Object message = record.value();
		logger.info("收到管理平臺命令:" + message);
	}
 * 
 */

@Configuration
@EnableKafka
public class KafkaConsumer {

	/**
	 * 
	 * Description:獲取配置
	 * Date:		2017年7月11日
	 * @author 		shaqf
	 */
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "Agent-Server-1.0.2");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		return props;
	}

	/** 獲取工廠 */
	public ConsumerFactory<String, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	/** 獲取例項 */
	@Bean
	public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}
}

還有一種方式,直接在配置檔案裡面新增Kafka地址即可,以上程式碼都可以省略