1. 程式人生 > >flume整合kafka和hdfs

flume整合kafka和hdfs

flume版本:1.7.0 kafka版本:2.11-0.10.1.0 hadoop 版本:2.6.0
最近在玩這個flume和kafka這兩個東西,網上有很多這方面的簡介,我就不多說了,我的理解為啥要整合這兩個在一起的,我的理解就是,flume作為訊息的持久化,然後就是kafka來用於訊息的傳輸,但我們蒐集這個網站的日誌的時候,我們就可以使用flume監控log的一個檔案或者是一個目錄,每當有新的log,flume就可以將其持久化到hdfs,然後將這個訊息發給kafka,kafka在對訊息進行分發,處理,實時計算等等。
既然要做,那麼第一步就是搭建好flume的環境,以及整合整個框架。
我自己畫了一個圖,不是很好看,但是意思懂就行。


 但我在網上找資料的時候,試過很多種方式來配置flume,但是都不怎麼有效果。在這裡建議使用官方網站上的文件來進行配置。
遇見過的問題:
org.apache.flume.conf.ConfigurationException: Bootstrap Servers must be specified
	at org.apache.flume.sink.kafka.KafkaSink.translateOldProps(KafkaSink.java:353)
	at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:295)
	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:411)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)


解決方式:在配置的時候新增上bootstrap的指向,不是使用broke.list


問題2:
org.apache.flume.FlumeException: Unable to load sink type: org.apache.flume.plugins.KafkaSink, class: org.apache.flume.plugins.KafkaSink
	at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:70)
	at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:408)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSink
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:191)
	at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:68)
	... 11 more


解決方式:由於flume的改版,原來的配置不是很合理,建議不要照抄網上的版本的配置,這裡參考官網修改為:org.apache.flume.sink.kafka.KafkaSink


最後附加上我的flume的配置:

#設定source的名稱
agent.sources = s
#設定channels的名稱
agent.channels = c c1
#設定sink的名稱
agent.sinks = r r1
# For each one of the sources, the type is defined
#exec 檔案 spoolddir 資料夾
agent.sources.s.type = exec
agent.sources.s.command = tail -n +0 -F /opt/testLog/test.log
#agent.source.s.type = spooldir
#agent.source.s.spoolDir = /usr/log
#agent.source.s.fileHeader = true
#agent.source.s.bathSize =100
# The channel can be defined as follows.
agent.sources.s.channels = c c1


# Each sink's type must be defined
#agent.sinks.r.type = org.apache.flume.plugins.KafkaSink
agent.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
#Specify the channel the sink should use
#agent.sinks.r.metadata.broker.list = localhost:9092
agent.sinks.r.kafka.bootstrap.servers = localhost:9092
agent.sinks.r.partition.key=0
agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
agent.sinks.r.serializer.class=kafka.serializer.StringEncoder
agent.sinks.r.request.required.acks=0
agent.sinks.r.max.message.size=1000000
agent.sinks.r.producer.type=sync
agent.sinks.r.custom.encoding=UTF-8
agent.sinks.r.kafka.topic = testFlume
# Each channel's type is defined.
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
#channel儲存容量
#agent.channels.memoryChannel.capacity = 1000
#事務容量
#agent.channels.memoryChannel.transactionCapacity = 100


#hdfsSink配置
agent.sinks.r1.type = hdfs
agent.sinks.r1.channel = c1
agent.sinks.r1.hdfs.path = hdfs://HDFSIP:8020/zy/flume/%y-%m-%d-%H
agent.sinks.r1.hdfs.filePrefix=events-
#設定檔案字尾名
#agent.sinks.r1.hdfs.fileSuffix = .log
agent.sinks.r1.hdfs.round = true
agent.sinks.r1.hdfs.roundValue = 10
agent.sinks.r1.hdfs.roundUnit = minute
#檔案格式 預設 seq檔案,
agent.sinks.r1.hdfs.fileType = DataStream
agent.sinks.r1.hdfs.writeFormat=Text
agent.sinks.r1.hdfs.rollInterval=0
#--觸發roll操作的檔案大小in bytes (0: never roll based on file size)
agent.sinks.r1.hdfs.rollSize=128000000
#--在roll操作之前寫入檔案的事件數量(0 = never roll based on number of events)
agent.sinks.r1.hdfs.rollCount=0
agent.sinks.r1.hdfs.idleTimeout=60
#--使用local time來替換轉移字元 (而不是使用event header的timestamp)
agent.sinks.r1.hdfs.useLocalTimeStamp = true
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity=1000
agent.channels.c1.keep-alive=30
agent.sinks.r.channel = c
agent.channels.c.type = memory
agent.channels.c.capacity = 1000