1. 程式人生 > >flume向kafka中寫入日誌,報錯WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(Net

flume向kafka中寫入日誌,報錯WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(Net

1.問題描述

flume從埠44444獲得資料,寫入kafka中,然後報錯,百思不得其解:

2018-11-20 07:41:59,917 (New I/O worker #1) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 218772532 items! Connection closed.
        at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
        at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2018-11-20 07:45:53,085 (kafka-producer-network-thread | producer-1) [INFO - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:497)] API versions request failed via disconnect. Defaulting legacy API versions

2.環境

(1)flume:flume-1.6.0-cdh5.15.0-bin

(2)kafka:kafka_2.10-0.8.2.1

(3)flume配置檔案avro-memory-kafka.conf

avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = hadoop
avro-memory-kafka.sources.avro-source.port = 44444

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = hadoop:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 5
avro-memory-kafka.sinks.kafka-sink.kafka.producer.acks  =1 

avro-memory-kafka.channels.memory-channel.type = memory

avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

(4)啟動kafka的broker和消費者

啟動broker
bin/kafka-server-start.sh -daemon config/server.properties
建立topic
bin/kafka-topics.sh --create --zookeeper hadoop:2181/kafka08 --replication-factor 1 --partitions 2 --topic hello_topic
啟動消費者
bin/kafka-console-consumer.sh --zookeeper hadoop:2181/kafka08 --topic hello_topic --from-beginning

(5)測試

-》啟動flume

bin/flume-ng agent \
--name avro-memory-kafka  \
--conf conf  \
--conf-file conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console

-》啟動avro

 telnet hadoop 44444

然後報錯

3.原因

去官網看了一下尷尬地發現這麼一句話,自己就無語了。哎,原來是flume版本不支援0.8.x的kafka,暈。