1. 程式人生 > >Kafka-API中介軟體MQ訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)

Kafka-API中介軟體MQ訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)

一、 Maven依賴
<dependency>
    <groupId>com.foriseland.fjf.mq</groupId>
    <artifactId>fjf-mq-kafka</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

二、配置說明

Product 配置:
<-- 以下資訊依賴配置中心 -->
<bean id="producerServer" class="com.foriseland.fjf.mq.producer.KafkaProducerGeneric" init-method="init"destroy-method="close">
   <propertyname="properties">
      <props>
         <prop key="bootstrap.servers">192.168.2.5:9092,192.168.2.5:9093,192.168.2.5:9094</prop>
            <propkey="acks">all</prop>
            <propkey="retries">1</prop>
            <propkey="batch.size">16384</prop>
            <propkey="linger.ms">1</prop>
            <propkey="buffer.memory">33554432</prop>
            <propkey="serializer.class">kafka.serializer.StringEncoder</prop>
            <propkey="key.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
            <propkey="value.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
        </props>
    </property>
</bean>

 
  Consumer
  配置:
 
 
<bean id="consumerConfig" class="com.foriseland.fjf.mq.motion.KafkaConsumerManager" init-method="init">
    <property name="properties">
	<props>
	    <prop key="bootstrap.servers">192.168.2.5:9092</prop>
	    <prop key="group.id">1</prop>
	    <prop key="enable.auto.commit">true</prop>
	    <prop key="session.timeout.ms">30000</prop>
	    <prop key="auto.commit.interval.ms">1000</prop>
	    <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
	    <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
	</props>
    </property>
</bean>

三、topic命名約定

Kafkatopic有著嚴格的命名約束,命名格式為: Object-model-key

        l  第一級為業務第二級為模組第三級為key

        l  資訊連線符為 - ,禁止採用其它字元進行連線。

        長度可大於三級,但是不能小於三級。

qumao-order-orderList這樣的命名結果,所有業務系統必須嚴格按照此命名結構去做,約束至少要到三級,如qumao-order-orderid-ordertyle,也是可以通過的(大於三級不限制),如果是qumao-ordertype這種型別(小於三級長度),系統會報錯,

KafkaTopic is not effective,key be similar to [object-model-key]

四、Product API例子


 
  引入屬性
 
 
  @Autowired
 
 
  private KafkaProducerGeneric producerServer;
 
 
  publicFuture<RecordMetadata> sendMessage(String topic, String key, Stringvalue);
 
 
  五、
  Consumer API 例子
 
 
  
 
 
 


六、 Consumer注意事項

l  Kafka或者rocketMq的消費端需要單獨建立model,因為後期需要單獨啟動。

l  介面統一繼承IKafkaConsumerServer介面。

l  Model型別為java工程就可以,通過main函式啟動。

l  啟動main函式方式,main函式類為:

.
public class StandardConsumerMqLauncher {
    private static final Logger logger = LoggerFactory.getLogger(StandardConsumerMqLauncher.class);
        @SuppressWarnings("resource")
	public static void main(String[] args) throws Exception {
    	    logger.info("啟動");   	
    	ApplicationContext context = new ClassPathXmlApplicationContext("classpath*:spring/spring-*.xml");
    	ConsumerManagerPool bean = context.getBean(ConsumerManagerPool.class);
		bean.start();
    	        logger.info("啟動完成");
    	synchronized (StandardConsumerMqLauncher.class) {
		while (true) {
			try {
				StandardConsumerMqLauncher.class.wait();
			} catch (InterruptedException e) {
				logger.error("後臺服務異常終止:" + e.getMessage(), e);
			}
		}
	}
    }
}