flume向kafka中寫入日誌,報錯WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(Net
阿新 • • 發佈:2018-11-25
1.問題描述
flume從埠44444獲得資料,寫入kafka中,然後報錯,百思不得其解:
2018-11-20 07:41:59,917 (New I/O worker #1) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream. org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 218772532 items! Connection closed. at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167) at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2018-11-20 07:45:53,085 (kafka-producer-network-thread | producer-1) [INFO - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:497)] API versions request failed via disconnect. Defaulting legacy API versions
2.環境
(1)flume:flume-1.6.0-cdh5.15.0-bin
(2)kafka:kafka_2.10-0.8.2.1
(3)flume配置檔案avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source avro-memory-kafka.sinks = kafka-sink avro-memory-kafka.channels = memory-channel avro-memory-kafka.sources.avro-source.type = avro avro-memory-kafka.sources.avro-source.bind = hadoop avro-memory-kafka.sources.avro-source.port = 44444 avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = hadoop:9092 avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic avro-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 5 avro-memory-kafka.sinks.kafka-sink.kafka.producer.acks =1 avro-memory-kafka.channels.memory-channel.type = memory avro-memory-kafka.sources.avro-source.channels = memory-channel avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
(4)啟動kafka的broker和消費者
啟動broker bin/kafka-server-start.sh -daemon config/server.properties 建立topic bin/kafka-topics.sh --create --zookeeper hadoop:2181/kafka08 --replication-factor 1 --partitions 2 --topic hello_topic 啟動消費者 bin/kafka-console-consumer.sh --zookeeper hadoop:2181/kafka08 --topic hello_topic --from-beginning
(5)測試
-》啟動flume
bin/flume-ng agent \
--name avro-memory-kafka \
--conf conf \
--conf-file conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
-》啟動avro
telnet hadoop 44444
然後報錯
3.原因
去官網看了一下尷尬地發現這麼一句話,自己就無語了。哎,原來是flume版本不支援0.8.x的kafka,暈。