1. 程式人生 > >Flume常見錯誤整理(持續更新ing...)

Flume常見錯誤整理(持續更新ing...)

1.下面這個問題挺坑的

 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - kafka.utils.Logging$class.error(Logging.scala:97)] Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test
 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - kafka.utils.Logging$class
.error(Logging.scala:103)] Producer connection to DEV33:9092 unsuccessful java.nio.channels.UnresolvedAddressException (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:89)] Failed to send producer request with correlation id 17 to broker 2 with data for
partitions [test,0]

查了又查,試了又試,終於知道哪出問題了。
配置檔案如下

a1.sources=r1
a1.sinks=k1
a1.channels=c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /opt/xfs/logs/tomcat/xfs-cs/logs/xfs_cs_41

# Describe the sink
#a1.sinks.k2.type=logger
a1.sinks.k1.type = org.apache.flume
.sink.kafka.KafkaSink a1.sinks.k1.topic = test a1.sinks.k1.brokerList = 192.168.0.72:9092,192.168.0.73:9092,192.168.0.83:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 #a1.sinks.k1.custom.partition.key=kafkaPartition #a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder #a1.sinks.k1.max.message.size=1000000 #a1.sinks.k1.custom.encoding=UTF-8 # Use a channel which buffers events in memory a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

就是這行有問題

a1.sinks.k1.brokerList = 192.168.0.72:9092,192.168.0.73:9092,192.168.0.83:9092

配置下主機名與IP對映

[root@DEMO41 conf]# vim /etc/hosts
192.168.0.72    dev22
192.168.0.73    dev23
192.168.0.83    dev33

然後把配置檔案的那行改成了主機名就沒問題了

a1.sinks.k1.brokerList = dev22:9092,dev23:9092,dev33:9092

2.brokerList must contain at least one Kafka broker
查了一下發現可能是因為broker list配置項名稱應該是 agent.sinks.kafkaSink.brokerList 而不是 agent.sinks.kafkaSink.metadata.broker.list。不過我發現是我把sink名稱敲錯了,應該是agent.sinks.k2.brokerList = dev22:9092,dev23:9092,dev33:9092,而我寫成了k1。

3.卡在這不動了

2016-08-12 16:15:05,869 (conf-file-poller-0) [WARN - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:133)] No configuration found for this host:agent
2016-08-12 16:15:05,881 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }

經檢查發現是我agent的名字寫錯了!

4.這是什麼鬼

[WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:590)] Could not configure source  r1 due to: Failed to configure component!
org.apache.flume.conf.ConfigurationException: Failed to configure component!
    at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:111)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:567)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:346)
    at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:213)
    at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:127)
    at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:109)
    at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:189)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:89)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.conf.ConfigurationException: No channels set for r1
    at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:69)
    ... 15 more

加上下面的問題解決

agent.sources.r1.channels = c1 c2

下面貼一下不報錯的配置檔案

agent.sources = r1
agent.channels = c1 c2
agent.sinks = k1 k2

############################define source begin
##define source-r1-exec
agent.sources.r1.channels = c1 c2
agent.sources.r1.type = exec
agent.sources.r1.command = tail -f /opt/xfs/logs/tomcat/xfs-cs/logs/xfs_cs_1

############################define sink begin
##define sink-k1-hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://192.168.0.71:9000/flumetest/%y-%m-%d/%H
agent.sinks.k1.hdfs.filePrefix = cs-%H
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 1
agent.sinks.k1.hdfs.roundUnit = hour
agent.sinks.k1.hdfs.useLocalTimeStamp = true

agent.sinks.k1.hdfs.minBlockReplicas = 1
agent.sinks.k1.hdfs.fileType = DataStream
#agent.sinks.k1.hdfs.writeFormat=Text

agent.sinks.k1.hdfs.rollInterval = 3600
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.hdfs.idleTimeout = 0


##define sink-k2-kafka
agent.sinks.k2.channel = c2
agent.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k2.topic = test
agent.sinks.k2.brokerList = dev22:9092,dev23:9092,dev33:9092
agent.sinks.k2.requiredAcks = 1
agent.sinks.k2.batchSize = 20
#agent.sinks.k2.serializer.class = Kafka.serializer.StringEncoder
agent.sinks.k2.producer.type = async
#agent.sinks.k2.batchSize = 100
agent.sources.r1.selector.type = replicating

############################define channel begin
##define c1
agent.channels.c1.type = file
agent.channels.fileChannel.checkpointDir = /opt/soft/apache-flume-1.6.0-bin/checkpoint
agent.channels.fileChannel.dataDirs = /opt/soft/apache-flume-1.6.0-bin/dataDir
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 100
##define c2
agent.channels.c2.type = memory
agent.channels.c2.capacity = 1000000
agent.channels.c2.transactionCapacity = 100