flume整合kafka和hdfs
阿新 • • 發佈:2019-01-28
flume版本:1.7.0 kafka版本:2.11-0.10.1.0 hadoop 版本:2.6.0
最近在玩這個flume和kafka這兩個東西,網上有很多這方面的簡介,我就不多說了,我的理解為啥要整合這兩個在一起的,我的理解就是,flume作為訊息的持久化,然後就是kafka來用於訊息的傳輸,但我們蒐集這個網站的日誌的時候,我們就可以使用flume監控log的一個檔案或者是一個目錄,每當有新的log,flume就可以將其持久化到hdfs,然後將這個訊息發給kafka,kafka在對訊息進行分發,處理,實時計算等等。
既然要做,那麼第一步就是搭建好flume的環境,以及整合整個框架。
我自己畫了一個圖,不是很好看,但是意思懂就行。
但我在網上找資料的時候,試過很多種方式來配置flume,但是都不怎麼有效果。在這裡建議使用官方網站上的文件來進行配置。
遇見過的問題:
解決方式:在配置的時候新增上bootstrap的指向,不是使用broke.list
問題2:
解決方式:由於flume的改版,原來的配置不是很合理,建議不要照抄網上的版本的配置,這裡參考官網修改為:org.apache.flume.sink.kafka.KafkaSink
最近在玩這個flume和kafka這兩個東西,網上有很多這方面的簡介,我就不多說了,我的理解為啥要整合這兩個在一起的,我的理解就是,flume作為訊息的持久化,然後就是kafka來用於訊息的傳輸,但我們蒐集這個網站的日誌的時候,我們就可以使用flume監控log的一個檔案或者是一個目錄,每當有新的log,flume就可以將其持久化到hdfs,然後將這個訊息發給kafka,kafka在對訊息進行分發,處理,實時計算等等。
既然要做,那麼第一步就是搭建好flume的環境,以及整合整個框架。
我自己畫了一個圖,不是很好看,但是意思懂就行。
但我在網上找資料的時候,試過很多種方式來配置flume,但是都不怎麼有效果。在這裡建議使用官方網站上的文件來進行配置。
遇見過的問題:
org.apache.flume.conf.ConfigurationException: Bootstrap Servers must be specified at org.apache.flume.sink.kafka.KafkaSink.translateOldProps(KafkaSink.java:353) at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:295) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:411) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
解決方式:在配置的時候新增上bootstrap的指向,不是使用broke.list
問題2:
org.apache.flume.FlumeException: Unable to load sink type: org.apache.flume.plugins.KafkaSink, class: org.apache.flume.plugins.KafkaSink at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:70) at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:408) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSink at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:68) ... 11 more
解決方式:由於flume的改版,原來的配置不是很合理,建議不要照抄網上的版本的配置,這裡參考官網修改為:org.apache.flume.sink.kafka.KafkaSink
最後附加上我的flume的配置:
#設定source的名稱 agent.sources = s #設定channels的名稱 agent.channels = c c1 #設定sink的名稱 agent.sinks = r r1 # For each one of the sources, the type is defined #exec 檔案 spoolddir 資料夾 agent.sources.s.type = exec agent.sources.s.command = tail -n +0 -F /opt/testLog/test.log #agent.source.s.type = spooldir #agent.source.s.spoolDir = /usr/log #agent.source.s.fileHeader = true #agent.source.s.bathSize =100 # The channel can be defined as follows. agent.sources.s.channels = c c1 # Each sink's type must be defined #agent.sinks.r.type = org.apache.flume.plugins.KafkaSink agent.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink #Specify the channel the sink should use #agent.sinks.r.metadata.broker.list = localhost:9092 agent.sinks.r.kafka.bootstrap.servers = localhost:9092 agent.sinks.r.partition.key=0 agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition agent.sinks.r.serializer.class=kafka.serializer.StringEncoder agent.sinks.r.request.required.acks=0 agent.sinks.r.max.message.size=1000000 agent.sinks.r.producer.type=sync agent.sinks.r.custom.encoding=UTF-8 agent.sinks.r.kafka.topic = testFlume # Each channel's type is defined. # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel #channel儲存容量 #agent.channels.memoryChannel.capacity = 1000 #事務容量 #agent.channels.memoryChannel.transactionCapacity = 100 #hdfsSink配置 agent.sinks.r1.type = hdfs agent.sinks.r1.channel = c1 agent.sinks.r1.hdfs.path = hdfs://HDFSIP:8020/zy/flume/%y-%m-%d-%H agent.sinks.r1.hdfs.filePrefix=events- #設定檔案字尾名 #agent.sinks.r1.hdfs.fileSuffix = .log agent.sinks.r1.hdfs.round = true agent.sinks.r1.hdfs.roundValue = 10 agent.sinks.r1.hdfs.roundUnit = minute #檔案格式 預設 seq檔案, agent.sinks.r1.hdfs.fileType = DataStream agent.sinks.r1.hdfs.writeFormat=Text agent.sinks.r1.hdfs.rollInterval=0 #--觸發roll操作的檔案大小in bytes (0: never roll based on file size) agent.sinks.r1.hdfs.rollSize=128000000 #--在roll操作之前寫入檔案的事件數量(0 = never roll based on number of events) agent.sinks.r1.hdfs.rollCount=0 agent.sinks.r1.hdfs.idleTimeout=60 #--使用local time來替換轉移字元 (而不是使用event header的timestamp) agent.sinks.r1.hdfs.useLocalTimeStamp = true agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity=1000 agent.channels.c1.keep-alive=30 agent.sinks.r.channel = c agent.channels.c.type = memory agent.channels.c.capacity = 1000