1. 程式人生 > >kafka: java生產者往kafka topic傳送資料傳送失敗

kafka: java生產者往kafka topic傳送資料傳送失敗

在使用java寫的kafka生產者把資料傳送給kafka的topic時,遇到傳送三次且資料無法傳送成功的Error.

2016-09-08 10:32:49,695- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 0 for 1 topic(s) Set(test09026)
2016-09-08 10:32:49,936- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:49,962- [INFO] Disconnecting from 172.17
.17.98:9092 2016-09-08 10:32:51,056- [INFO] Connected to localhost:9092 for producing 2016-09-08 10:32:51,057- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:51,060- [WARN] Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test09026,0] java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at ProducerDemo.run(ProducerDemo.java:32) 2016-09-08 10:32:51,084- [INFO] Back off for 100 ms before retrying send. Remaining retries = 3 2016-09-08 10:32:51,186- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 3 for 1 topic(s) Set(test09026) 2016-09-08 10:32:51,188- [INFO] Connected to 172.17.17.98:9092 for producing 2016-09-08 10:32:51,194- [INFO] Disconnecting from 172.17.17.98:9092 2016-09-08 10:32:51,195- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:52,207- [INFO] Connected to localhost:9092 for producing 2016-09-08 10:32:52,207- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:52,207- [WARN] Failed to send producer request with correlation id 5 to broker 0 with data for partitions [test09026,0] java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at ProducerDemo.run(ProducerDemo.java:32) 2016-09-08 10:32:52,210- [INFO] Back off for 100 ms before retrying send. Remaining retries = 2 2016-09-08 10:32:52,311- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 6 for 1 topic(s) Set(test09026) 2016-09-08 10:32:52,313- [INFO] Connected to 172.17.17.98:9092 for producing 2016-09-08 10:32:52,321- [INFO] Disconnecting from 172.17.17.98:9092 2016-09-08 10:32:52,339- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:53,346- [INFO] Connected to localhost:9092 for producing 2016-09-08 10:32:53,346- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:53,347- [WARN] Failed to send producer request with correlation id 8 to broker 0 with data for partitions [test09026,0] java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at ProducerDemo.run(ProducerDemo.java:32) 2016-09-08 10:32:53,348- [INFO] Back off for 100 ms before retrying send. Remaining retries = 1 2016-09-08 10:32:53,450- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 9 for 1 topic(s) Set(test09026) 2016-09-08 10:32:53,453- [INFO] Connected to 172.17.17.98:9092 for producing 2016-09-08 10:32:53,456- [INFO] Disconnecting from 172.17.17.98:9092 2016-09-08 10:32:53,457- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:54,462- [INFO] Connected to localhost:9092 for producing 2016-09-08 10:32:54,462- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:54,466- [WARN] Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test09026,0] java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:108) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at ProducerDemo.run(ProducerDemo.java:32) 2016-09-08 10:32:54,468- [INFO] Back off for 100 ms before retrying send. Remaining retries = 0 2016-09-08 10:32:54,582- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 12 for 1 topic(s) Set(test09026) 2016-09-08 10:32:54,585- [INFO] Connected to 172.17.17.98:9092 for producing 2016-09-08 10:32:54,588- [INFO] Disconnecting from 172.17.17.98:9092 2016-09-08 10:32:54,588- [INFO] Disconnecting from localhost:9092 2016-09-08 10:32:54,591- [ERROR] Failed to send requests for topics test09026 with correlation ids in [0,12] Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96) at kafka.producer.Producer.send(Producer.scala:78) at kafka.javaapi.producer.Producer.send(Producer.scala:35) at ProducerDemo.run(ProducerDemo.java:32)

我是從window伺服器往linux伺服器傳送內容的。

解決方案:
1.在windows的host檔案中加上linux伺服器ip和hostname的對映。
2.在server.properteies中修改如下配置

#advertised.listeners=PLAINTEXT://your.host.name:9092

修改成

advertised.listeners=PLAINTEXT://172.16.128.208:9092