1. 程式人生 > >Failing to produce to kafka brokers java.nio.BufferUnderflowException

Failing to produce to kafka brokers java.nio.BufferUnderflowException

storm程式在虛擬機器的測試環境沒有問題,部署上生產環境後出現如下問題:

2017-04-17 09:43:14 k.p.a.DefaultEventHandler [INFO] Back off for 100 ms before retrying send. Remaining retries = 1
2017-04-17 09:43:14 k.c.ClientUtils$ [INFO] Fetching metadata from broker BrokerEndPoint(3,node12,6667) with correlation id 312494 for 1 topic(s) Set(zyq)
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Connected to node12:6667 for producing
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node12:6667
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node11:6667
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node10:6667
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node9:6667
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Connected to node11:6667 for producing
2017-04-17 09:43:14 k.p.a.DefaultEventHandler [WARN] Failed to send producer request with correlation id 312695 to broker 1 with data for partitions [zyq,2]
java.nio.BufferUnderflowException: null
	at java.nio.Buffer.nextGetIndex(Buffer.java:498) ~[na:1.7.0_79]
	at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406) ~[na:1.7.0_79]
	at kafka.api.ProducerResponse$$anonfun$1$$anonfun$apply$1.apply(ProducerResponse.scala:40) ~[stormjar.jar:na]
	at kafka.api.ProducerResponse$$anonfun$1$$anonfun$apply$1.apply(ProducerResponse.scala:36) ~[stormjar.jar:na]
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[stormjar.jar:na]
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[stormjar.jar:na]
	at scala.collection.immutable.Range.foreach(Range.scala:141) ~[stormjar.jar:na]
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) ~[stormjar.jar:na]
	at scala.collection.AbstractTraversable.map(Traversable.scala:105) ~[stormjar.jar:na]
	at kafka.api.ProducerResponse$$anonfun$1.apply(ProducerResponse.scala:36) ~[stormjar.jar:na]
	at kafka.api.ProducerResponse$$anonfun$1.apply(ProducerResponse.scala:33) ~[stormjar.jar:na]
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[stormjar.jar:na]
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[stormjar.jar:na]
	at scala.collection.immutable.Range.foreach(Range.scala:141) ~[stormjar.jar:na]
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[stormjar.jar:na]
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[stormjar.jar:na]
	at kafka.api.ProducerResponse$.readFrom(ProducerResponse.scala:33) ~[stormjar.jar:na]
	at kafka.producer.SyncProducer.send(SyncProducer.scala:114) ~[stormjar.jar:na]
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275) [stormjar.jar:na]
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113) [stormjar.jar:na]
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105) [stormjar.jar:na]
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) [stormjar.jar:na]
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) [stormjar.jar:na]
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) [stormjar.jar:na]
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) [stormjar.jar:na]
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) [stormjar.jar:na]
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) [stormjar.jar:na]
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) [stormjar.jar:na]
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105) [stormjar.jar:na]
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:106) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:89) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:69) [stormjar.jar:na]
	at scala.collection.Iterator$class.foreach(Iterator.scala:727) [stormjar.jar:na]
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:68) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46) [stormjar.jar:na]
2017-04-17 09:43:14 k.p.a.DefaultEventHandler [INFO] Back off for 100 ms before retrying send. Remaining retries = 0
2017-04-17 09:43:14 k.c.ClientUtils$ [INFO] Fetching metadata from broker BrokerEndPoint(0,node9,6667) with correlation id 312696 for 1 topic(s) Set(zyq)
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Connected to node9:6667 for producing
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node9:6667
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node11:6667
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node10:6667
2017-04-17 09:43:14 k.p.SyncProducer [INFO] Disconnecting from node9:6667
2017-04-17 09:43:14 k.p.a.DefaultEventHandler [ERROR] Failed to send requests for topics zyq with correlation ids in [311889,312696]
2017-04-17 09:43:14 k.p.a.ProducerSendThread [ERROR] Error in handling batch of 200 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96) ~[stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:106) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:89) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:69) [stormjar.jar:na]
	at scala.collection.Iterator$class.foreach(Iterator.scala:727) [stormjar.jar:na]
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:68) [stormjar.jar:na]
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46) [stormjar.jar:na]

之後發現生產環境和測試環境中的kafka的jar包版本不一樣,生產環境是0.8.1,eclipse中的環境是0.10.0,將topology中的kafka的jar包的版本換成0.8.1就解決了。