Kafka-API中介軟體MQ訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)
阿新 • • 發佈:2018-11-01
一、
Maven依賴
六、 Consumer注意事項
<dependency>
<groupId>com.foriseland.fjf.mq</groupId>
<artifactId>fjf-mq-kafka</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
二、配置說明
Product 配置:<-- 以下資訊依賴配置中心 --> <bean id="producerServer" class="com.foriseland.fjf.mq.producer.KafkaProducerGeneric" init-method="init"destroy-method="close"> <propertyname="properties"> <props> <prop key="bootstrap.servers">192.168.2.5:9092,192.168.2.5:9093,192.168.2.5:9094</prop> <propkey="acks">all</prop> <propkey="retries">1</prop> <propkey="batch.size">16384</prop> <propkey="linger.ms">1</prop> <propkey="buffer.memory">33554432</prop> <propkey="serializer.class">kafka.serializer.StringEncoder</prop> <propkey="key.serializer">org.apache.kafka.common.serialization.StringSerializer</prop> <propkey="value.serializer">org.apache.kafka.common.serialization.StringSerializer</prop> </props> </property> </bean>
Consumer
配置:
<bean id="consumerConfig" class="com.foriseland.fjf.mq.motion.KafkaConsumerManager" init-method="init"> <property name="properties"> <props> <prop key="bootstrap.servers">192.168.2.5:9092</prop> <prop key="group.id">1</prop> <prop key="enable.auto.commit">true</prop> <prop key="session.timeout.ms">30000</prop> <prop key="auto.commit.interval.ms">1000</prop> <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop> <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop> </props> </property> </bean>
三、topic命名約定
Kafka的 topic有著嚴格的命名約束,命名格式為: Object-model-key 。l 第一級為業務,第二級為模組,第三級為key。
l 資訊連線符為 - ,禁止採用其它字元進行連線。
長度可大於三級,但是不能小於三級。
qumao-order-orderList這樣的命名結果,所有業務系統必須嚴格按照此命名結構去做,約束至少要到三級,如qumao-order-orderid-ordertyle,也是可以通過的(大於三級不限制),如果是qumao-ordertype這種型別(小於三級長度),系統會報錯,
KafkaTopic is not effective,key be similar to [object-model-key]四、Product API例子
引入屬性
@Autowired
private KafkaProducerGeneric producerServer;
publicFuture<RecordMetadata> sendMessage(String topic, String key, Stringvalue);
五、
Consumer API 例子
六、 Consumer注意事項
l Kafka或者rocketMq的消費端需要單獨建立model,因為後期需要單獨啟動。
l 介面統一繼承IKafkaConsumerServer介面。
l Model型別為java工程就可以,通過main函式啟動。
l 啟動main函式方式,main函式類為:
.public class StandardConsumerMqLauncher {
private static final Logger logger = LoggerFactory.getLogger(StandardConsumerMqLauncher.class);
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
logger.info("啟動");
ApplicationContext context = new ClassPathXmlApplicationContext("classpath*:spring/spring-*.xml");
ConsumerManagerPool bean = context.getBean(ConsumerManagerPool.class);
bean.start();
logger.info("啟動完成");
synchronized (StandardConsumerMqLauncher.class) {
while (true) {
try {
StandardConsumerMqLauncher.class.wait();
} catch (InterruptedException e) {
logger.error("後臺服務異常終止:" + e.getMessage(), e);
}
}
}
}
}