1. 程式人生 > >kafka 並發數配置過程中踩到的坑 InstanceAlreadyExistsException

kafka 並發數配置過程中踩到的坑 InstanceAlreadyExistsException

static 靜態初始化塊 obj -c 異常 判斷 con lba comm

2017-07-05 13:09:15.460 [kafka_spout:7-MultipleThreadSpoutExecutors] WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=gx-test-20170629
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:284)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:222)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:179)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:204)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:126)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:204)
at com.mapbar.stream.qingqi.core.spout.KafkaSpout.open(KafkaSpout.java:74)
at com.alibaba.jstorm.task.execute.spout.SpoutExecutors.init(SpoutExecutors.java:142)
at com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors.init(MultipleThreadSpoutExecutors.java:64)
at com.alibaba.jstorm.task.execute.BaseExecutors.initWrapper(BaseExecutors.java:154)
at com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors.run(MultipleThreadSpoutExecutors.java:76)
at com.alibaba.jstorm.callback.AsyncLoopRunnable.run(AsyncLoopRunnable.java:95)
at java.lang.Thread.run(Thread.java:745)

上面是本人在使用spring kafka中所遇到的問題,針對此問題做一個記錄,整理到此處

出現上述問題的原因:

ConcurrentMessageListenerContainer factory = new ConcurrentMessageListenerContainer(cf, containerProps);
factory.setConcurrency(kafkaConfig.getConcurrencySize());
如果使用了ConcurrentMessageListenerContainer 的實現,並且配置了並發度大於1,同時配置了kafka的 client.id屬性則會出現上述問題,而當你配置為1的時候不會出現上述log
解決方式:不配置client.id這一項,kakfa中會默認為多個線程生成id

詳細解析:
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640)
從error log中可以看出

技術分享

在調用AppInfoParser.registerAppInfo方法時出現的異常

  

at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

  從堆棧信息中定位到 Repository.addMBean()

截取其中一部分代碼

技術分享

上圖中可以得知,一個clientId對應一個,不可重復

最後找到KafkaConsumer,發現是因為自己配置了client.id導致的,如果不配置的話

技術分享

會為每一個線程生成一個clientid,"consumer" + 自增id,原子性遞增

看到這裏,發現了一個問題,如果自己不配置client.id的話,那從config裏取出來的數據直接判斷的length,猜測是有默認配置,當自己不配置的時候給賦值為"" 空串,於是又查了一下代碼來驗證

創建consumer對象的時候,會創建ConsumerConfig 配置,new ConsumerConfig()會調用父類構造方法
ConsumerConfig(Map<?, ?> props) {
super(CONFIG, props);
}
AbstractConfig

技術分享

ConsumerConfig中有static靜態初始化塊,來初始化 ConfigDef

技術分享

把client.id賦值為""



kafka 並發數配置過程中踩到的坑 InstanceAlreadyExistsException