1. 程式人生 > >Flume + kafka + HDFS構建日誌採集系統

Flume + kafka + HDFS構建日誌採集系統

本文轉載自:http://shift-alt-ctrl.iteye.com/blog/2339981,點選下面閱讀原文即可進入。

Flume是一個非常優秀日誌採集元件,類似於logstash,我們通常將Flume作為agent部署在application server上,用於收集本地的日誌檔案,並將日誌轉存到HDFS、kafka等資料平臺中;關於Flume的原理和特性,我們稍後詳解,本文只簡述如何構建使用Flume + kafka + HDFS構建一套日誌採集系統。

  • Flume:作為agent部署在每個application server,指定需要收集的日誌檔案列表,日誌檔案通常為application通過logback等生成。(本文基於Flume 1.7.0)

  • kafka:基於Flume,將“準實時”資料傳送給kafka;比如“tail”某個檔案的實時資料。對於實時資料分析元件或者同類型的資料消費者,可以通過kafka獲取實時資料。(kafka 0.9.0)

  • HDFS:基於Flume,將“歷史資料”儲存在HDFS,“歷史資料”比如“每天rotate生成的日誌檔案”,我們熟悉的catalina.out檔案,每天都rotate生成一個新的。當然對於“準實時”資料也可以儲存在HDFS中,Flume支援將“tail”的資料每隔?小時生成一個HDFS檔案等。通常情況下,我們將“歷史資料”儲存在HDFS,而不是“實時資料”。(hadoop 2.6.5)

  • 對於歷史資料,我們基於Flume的Spooling方式將資料轉存在HDFS中;對於“準實時”資料,我們基於Flume的Tail方式將資料轉存在kafka中。

HDFS準備

首先,我們需要一個hadoop平臺,用於儲存歷史資料,我們所採集的資料通常為“日誌資料”,搭建hadoop平臺過程此處不再贅言。

我們規劃的5臺hadoop,2個namenode基於HA方式部署,3個datanode;其中namenode為4Core、8G、200G配置,datanode為8Core、16G、2T配置,blockSize為128M(日誌檔案大小普遍為2G左右,每個小時,大概在100M左右),replication個數為2。

Kafka準備

kafka的目的就是接收“準實時”資料,受限於kafka的本身特性,我們儘量不要讓kafka儲存太多的資料,即訊息消費端儘可能的快(儘可能短的中斷時間)。我們的叢集為4個kafka例項,8Core、16G、2T配置,replication個數為2,資料持久時間為7天。kafka和hadoop都依賴於zookeeper叢集,zk的叢集是額外搭建的。

比較考驗設計的事情,是如何設計Topic;當kafka叢集上topic數量過多時,比如一個“tail”的檔案分配一個topic,將會對kafka的效能帶來巨大挑戰,同時Topic太多會導致訊息消費端編碼複雜度較高;另一個方面,如果Topic過少,比如一個project中所有的“tail”的檔案歸屬一個Topic,那麼次topic中的資料來自多個檔案,那麼資料分揀的難度就會變大。

我個人的設計理念為:一個project中,每個“tail”的檔案一個topic,無論這個project部署了多少實力,同一個“tail”檔案歸為一個topic;比如order-center專案中有一個業務日誌pay.log,此project有20臺例項,我們的topic名字為order-center-pay,那麼這20個例項中的order.log會被收集到此topic中,不過為了便於資料分揀,order.log中每條日誌都會攜帶各自的“local IP”。

kafka的配置樣例(server.properties):

broker.id=1  
listeners=PLAINTEXT://10.0.1.100:9092  
port=9092  
#host.name=10.0.1.100  
num.network.threads=3  
num.io.threads=8  
num.io.threads=8  
num.network.threads=8  
num.partitions=1  
socket.send.buffer.bytes=102400  
socket.receive.buffer.bytes=102400  
socket.request.max.bytes=104857600  
log.dirs=/data/kafka  
num.partitions=1  
num.recovery.threads.per.data.dir=1  
default.replication.factor=2  
log.flush.interval.messages=10000  
log.flush.interval.ms=1000  
log.retention.hours=168  
log.segment.bytes=1073741824  
log.retention.check.interval.ms=300000  
zookeeper.connect=10.0.1.10:2181,10.0.1.11:2181,10.0.2.10:2181/kafka  
zookeeper.connection.timeout.ms=6000  
delete.topic.enable=true  
min.insync.replicas=1  
zookeeper.session.timeout.ms=6000

上述配置中,有2個地方需要特別注意:listeners和host.name,我們在listeners中指定kafka繫結的地址和埠,通常為本機的內網IP,將host.name設定為空,此處如果設定不當,會導致Flume無法找到kafka地址(address resolve失敗);第二點就是zookeeper.connect地址,我們在地址後面增加了root path,此後Flume作為producer端傳送訊息時,指定的zookeeper地址也要帶上此root path。此外,還有一些重要的引數,比如replicas、partitions等。

kafka不是本文的介紹重點,所以請你參考本人的其他博文獲取更多的資訊。

後來的我們

主演:井柏然 / 周冬雨 / 田壯壯

貓眼電影演出 廣告 購買

Flume配置

根據我們的架構設計要求,實時資料發給kafka,歷史資料發給HDFS;Flume完全可以滿足我們這些要求,在Flume中,Spooling模式可以掃描一個檔案目錄下所有的檔案,並將新增的檔案傳送給HDFS;同時其TAILDIR模式中,可以掃描一個(或者多個)檔案,不斷tail其最新追加的資訊,然後傳送給kafka。基本概念:

  • source:原始檔、源資料端,指定Flume從何處採集資料(流)。Flume支援多種source,比如“Avro source”(類似RPC模式,接收遠端Avro客戶端傳送的資料Entity)、“Thrift Source”(Thrift客戶端傳送的資料)、“Exec Source”(linux指令返回的資料條目)、“Kafka Source”、“Syslog Source”、“Http Source”等等。

    我們本文主要涉及到Spooling和Taildir兩種,Taildir是1.7新增的特性,在此之前,如果想實現tail特性,需要使用“Exec Source”來模擬,或者自己開發程式碼。

  • channel:通道,簡單而言就是資料流的緩衝池,多個source的資料可以傳送給一個channel,在channel內部可以對資料進行cache、溢位暫存、流量整形等。目前Flume支援“Memory Channel”(資料儲存在有限空間的記憶體中)、“JDBC Channel”(資料暫存在資料庫中,保障恢復)、“Kafka Channel”(暫存在kafka中)、“File Channel”(暫存在本地檔案中);除Memory之外,其他的channel都支援持久化,可以在故障恢復、sink離線或者無sink等場景下提供有效的擔保機制,避免訊息丟失和流量抗擊。

  • sink:流輸出端,每個channel都可以對應一個sink,每個sink可以指定一種型別的儲存方式,目前Flume支援的sink型別比較常用的有“HDFS Sink”(將資料儲存在hdfs中)、“Hive Sink”、“Logger Sink”(特殊場景,將資料以INFO級別輸出到控制檯,通常用於測試)、“Avro Sink”、“Thrift Sink”、“File Roll Sink”(轉存到本地檔案系統中)等等。

本文不詳細介紹Flume的特性,我們只需要簡單知道一些概念即可,source、channel、sink這種模型就是pipeline,一個source的資料可以“複製”到多個channels(扇出),當然多個source也可以聚集到一個channel中,每個channel對應一個sink。每種型別的source、channel、sink都有各自的配置屬性,用於更好的控制資料流。

Flume是java語言開發,所以我們在啟動Flume之前,需要設定JVM的堆疊大小等選參,以免Flume對宿主機器上的其他application帶來負面影響。在conf目錄下,修改flume-env.sh:

export JAVA_OPTS="-Dcom.sun.management.jmxremote -verbose:gc -server 
-Xms1g -Xmx1g -XX:NewRatio=3 -XX:SurvivorRatio=8 -XX:MaxMetaspaceSize=128M
-XX:+UseConcMarkSweepGC -XX:CompressedClassSpaceSize=128M
-XX:MaxTenuringThreshold=5 -XX:CMSInitiatingOccupancyFraction=70
-XX:+PrintGCDetails -XX:+PrintGCDateStamps
-Xloggc:/opt/flume/logs/server-gc.log.$(date +%F)
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=64M"  

本人限定Flume的JVM堆大小為1G,如果你的機器記憶體空閒較多或者收集的資料檔案較多,可以考慮適度增大此值。

除此之外,就是flume的啟動配置檔案了(flume-conf.properties),如下配置我們模擬一個收集nginx日誌的場景:

##main  
nginx.channels=ch-spooling ch-tail  
nginx.sources=spooling-source tail-source  
nginx.sinks=hdfs-spooling kafka-tail  

#
#channel  
nginx.channels.ch-spooling.type=file  
nginx.channels.ch-spooling.checkpointDir=/data/flume/.flume/file-channel/ch-spooling/checkpoint  
nginx.channels.ch-spooling.dataDirs=/data/flume/.flume/file-channel/ch-spooling/data  
nginx.channels.ch-spooling.capacity=1000  
nginx.channels.ch-spooling.transactionCapacity=100  
nginx.channels.ch-spooling.capacity=100000  
nginx.channels.ch-tail.type=file  
nginx.channels.ch-tail.checkpointDir=/data/flume/.flume/file-channel/ch-tail/checkpoint  
nginx.channels.ch-tail.dataDirs=/data/flume/.flume/file-channel/ch-tail/data  
nginx.channels.ch-tail.capacity=1000  
nginx.channels.ch-tail.transactionCapacity=100  
nginx.channels.ch-tail.capacity=100000  

#
#source,歷史資料  
nginx.sources.spooling-source.type=spooldir  
nginx.sources.spooling-source.channels=ch-spooling  
##指定logs目錄  
nginx.sources.spooling-source.spoolDir=/data/logs/nginx  
##開啟header,此後event將攜帶此header  
nginx.sources.spooling-source.fileHeader=true  
nginx.sources.spooling-source.fileHeaderKey=file  
##header中增加檔名  
nginx.sources.spooling-source.basenameHeader=true  
nginx.sources.spooling-source.basenameHeaderKey=basename  
##日誌傳送完畢後,是否刪除此原始檔,  
#“immediate”表示傳送完畢後立即刪除,可以節約磁碟空間  
nginx.sources.spooling-source.deletePolicy=never  
##包含的檔案的列表,我們約定所有的日誌每天rotate,  
##格式為“<filename>.log-<yyyyMMdd>”  
##當前的日誌,不會被包含進來。  
nginx.sources.spooling-source.includePattern=^.*\.log-.+$  
nginx.sources.spooling-source.consumeOrder=oldest  
nginx.sources.spooling-source.recursiveDirectorySearch=false  
nginx.sources.spooling-source.batchSize=100  
nginx.sources.spooling-source.inputCharset=UTF-8  
##如果編解碼失敗,忽略相應的字元。  
nginx.sources.spooling-source.decodeErrorPolicy=IGNORE  
nginx.sources.spooling-source.selector.type=replicating  
nginx.sources.spooling-source.interceptors=i1 i2  
##使用timestamp攔截器,將會在event header中增加時間戳欄位  
nginx.sources.spooling-source.interceptors.i1.type=timestamp  
##使用host攔截器,將會在event header中增加"host"欄位,值為ip  
nginx.sources.spooling-source.interceptors.i2.type=host  
nginx.sources.spooling-source.interceptors.i2.useIP=true  
nginx.sources.spooling-source.interceptors.i2.hostHeader=host  
nginx.sources.tail-source.type=TAILDIR  
nginx.sources.tail-source.channels=ch-tail  
##本人不想寫flume的擴充套件程式碼,所以就為每個tail的檔案指定一個group  
nginx.sources.tail-source.filegroups=www error  
nginx.sources.tail-source.filegroups.www=/data/logs/nginx/www.log  
nginx.sources.tail-source.filegroups.error=/data/logs/nginx/error.log  
##對於taildir,需要間歇性的儲存tail檔案的位置,以便中斷後可以繼續  
##json格式檔案  
nginx.sources.tail-source.positionFile=/data/flume/.flume/ch-tail/taildir_position.json  
##每個tail的檔案,建立一個kafka topic  
nginx.sources.tail-source.headers.www.topic=nginx-www  
nginx.sources.tail-source.headers.error.topic=nginx-error  
nginx.sources.tail-source.skipToEnd=true  
nginx.sources.tail-source.interceptors=i1 i2  
nginx.sources.tail-source.interceptors.i1.type=timestamp  
nginx.sources.tail-source.interceptors.i2.type=host  
nginx.sources.tail-source.interceptors.i2.useIP=true  
nginx.sources.tail-source.interceptors.i2.hostHeader=host  

#
#spooling歷史資料  
nginx.sinks.hdfs-spooling.channel=ch-spooling  
nginx.sinks.hdfs-spooling.type=hdfs  
nginx.sinks.hdfs-spooling.hdfs.fileType=DataStream  
nginx.sinks.hdfs-spooling.hdfs.writeFormat=Text  
##儲存在hdfs中,路徑表達了日誌分類資訊,第一級為<project>  
##第二級為<date>  
##即同一個專案的日子,按照日期彙總。  
nginx.sinks.hdfs-spooling.hdfs.path=hdfs://hadoop-ha/logs/nginx/%Y-%m-%d  
##hdfs的檔名中包括此原始檔所在的host地址,便於資料分揀  
nginx.sinks.hdfs-spooling.hdfs.filePrefix=%{basename}.[%{host}]  
##對於spooling的檔案,檔名儘可能接近原始名稱,所以suffix值為空  
nginx.sinks.hdfs-spooling.hdfs.fileSuffix=  
##檔案在同步過程中,字尾為.tmp  
nginx.sinks.hdfs-spooling.hdfs.inUseSuffix=.tmp  
##不按照時間間隔滾動生成新檔案  
nginx.sinks.hdfs-spooling.hdfs.rollInterval=0  
##1G,當檔案大小達到1G後,滾動生成新檔案  
nginx.sinks.hdfs-spooling.hdfs.rollSize=1073741824  
##不按照event條數滾動生成新檔案  
nginx.sinks.hdfs-spooling.hdfs.rollCount=0  
##IO通道空閒60S秒後,關閉  
nginx.sinks.hdfs-spooling.hdfs.idleTimeout=60  

#
#tail實時資料  
nginx.sinks.kafka-tail.channel=ch-tail  
nginx.sinks.kafka-tail.type=org.apache.flume.sink.kafka.KafkaSink  
##kafka叢集地址,可以為其子集  
nginx.sinks.kafka-tail.kafka.bootstrap.servers=10.0.3.78:9092,10.0.4.78:9092,10.0.4.79:9092,10.0.3.77:9092  
##注意,topic中不支援引數化  
##但是為了提高擴充套件性,我們把topic資訊通過header方式控制  
#nginx.sinks.kafka-tail.kafka.topic=nginx-%{filename}  
##default 100,值越大,網路效率越高,但是延遲越高,準實時  
nginx.sinks.kafka-tail.flumeBatchSize=32  
nginx.sinks.kafka-tail.kafka.producer.acks=1  
##use Avro-event format,will contain flume-headers  
##default : false  
nginx.sinks.kafka-tail.useFlumeEventFormat=false  

這是一個很長的配置檔案,各個配置項的含義大家可以去官網查閱,我們需要注意幾個地方:

  • checkpoint、data目錄,最好指定,這對以後排查問題很有幫助

  • channel,我們需要顯示宣告其型別,通常我們使用file,對流量抗擊有些幫助,前提是指定的目錄所在磁碟空間應該相對充裕和高速。

  • header並不會真的會寫入sink,header資訊只是在source、channel、sink互動期間有效;我們可以通過header標記一個event流動的特性。

  • 對於spooling source,建議開啟basename,即檔案的實際名稱,我們可以將此header傳遞到sink階段。

  • 所有涉及到batchSize的特性,都是需要權衡的:在傳送效率和延遲中做出合理的決策。

  • interceptor是Flume很重要的特性,可以幫助我們在source生命週期之後做一些自定義的操作,比如增加header、內容修正等;此時我們需要關注一些效能問題。

  • 對於taildir,filegroups中可以指定多個值,我的設計原則是一個tail檔案對應一個group名稱,目前還沒有特別好的辦法來通配tail檔案,只能逐個宣告。

  • 對於kafka sink,topic資訊可以通過“kafka.topic”指定,也可以在通過header指定(headers.www.topic,“www”對應group名稱,“topic”是header的key名稱)。為了靈活性,我更傾向於在headers中指定topic。

  • hdfs sink需要注意其roll的時機,目前影響roll時機的幾個引數“minBlockReplicas”、“rollInterval”(根據時間間隔)、“rollSize”(根據檔案尺寸)、“rollCount”(根據event條數);此外“round”相關的選項也可以干預滾動生成新檔案的時機。

關於hdfs sink折磨了我很久,flume每次flush都將生成一個新的hdfs檔案,最終導致生成很多小檔案,我希望一個tail的檔案最終在hdfs中也是一個檔案;後來經過考慮,使用基於rollSize來滾動生成檔案,通常本人的nginx日誌檔案不超過1G,那麼我就讓rollSize設定為1G,這樣就可以確保不會roll。此外,hdfs每個檔案都會有一個“數字”字尾,這個數字是一個內部的counter,目前沒有辦法通過配置的方式來“消除”,我們先暫且接受吧。

如下為nginx中log_format樣例,我們在每條日誌的首個位置,設定了$hostname用於標記此檔案的來源機器,便於kafka訊息消費者分揀資料。

log_format  main  '$hostname|$remote_addr|$remote_user|$time_local|$request|'  
                 '$status|$body_bytes_sent|$http_referer|$request_id|'  
                   '$http_user_agent|$http_x_forwarded_for|$request_time|$upstream_response_time|$upstream_addr|$upstream_connect_time';  

對於flume的配置,我們可以通過zookeeper來儲存,這是1.7版本新增的特性,配置中心化,這種方式大家可以參考。不過本人考慮到配置的可見性,我並沒有將配置放在zookeeper中,而是放在了一臺配置中控機上,通過jenkins來部署flume,每個project分散式部署,每個節點一個flume例項,它們使用同一個配置檔案,在部署flume時從中控機上scp新配置即可。(這需要先有一個自動化部署平臺)

我們看到配置檔案中的配置項都以“nginx”開頭,這個字首表示agent的名稱,我們可以根據實際業務來命名即可,但是在啟動flume時必須制定,原則上一個flume-conf.properties檔案中可以宣告多個agent的配置項,不過我們通常不建議這麼用。

我們把flume部署在nginx所在機器上,調整好配置檔案,即可啟動,flume啟動指令碼:

nohup bin/flume-ng agent --conf conf --conf-file flume-conf.properties \ 
--name nginx -Dflume.root.logger=INFO,CONSOLE \
-Dorg.apache.flume.log.printconfig=true  \
-Dorg.apache.flume.log.rawdata=true  

上述啟動指令中,--config-file就是指定配置檔案的路徑和名稱,--name指定agent名稱(與配置檔案中的配置項字首保持一致),logger資訊我們在線上為INFO,在測試期間可以指定為“DEBUG,LOGFILE”便於我們排查問題。

tomcat業務日誌收集

關於Flume收集tomcat業務日誌,需要調整的點比較多;本人的設計初衷是:

  • HDFS中收集所有的歷史日誌,包括catalina、access_log、業務日誌等。

  • kafka只實時收集access_log和指定的業務日誌;我們可以用這些資料做業務監控等。

tomcat日誌格式

我們首先調整tomcat中的logging.properties:

1catalina.org.apache.juli.AsyncFileHandler.level = FINE  
1catalina.org.apache.juli.AsyncFileHandler.directory = ${catalina.base}/logs  
##here  
1catalina.org.apache.juli.AsyncFileHandler.prefix = catalina.log.  
1catalina.org.apache.juli.AsyncFileHandler.suffix =  
2localhost.org.apache.juli.AsyncFileHandler.level = FINE  
2localhost.org.apache.juli.AsyncFileHandler.directory = ${catalina.base}/logs  
2localhost.org.apache.juli.AsyncFileHandler.prefix = localhost.log.  
2localhost.org.apache.juli.AsyncFileHandler.suffix =  
3manager.org.apache.juli.AsyncFileHandler.level = FINE  
3manager.org.apache.juli.AsyncFileHandler.directory = ${catalina.base}/logs  
3manager.org.apache.juli.AsyncFileHandler.prefix = manager.log.  
3manager.org.apache.juli.AsyncFileHandler.suffix =  
4host-manager.org.apache.juli.AsyncFileHandler.level = FINE  
4host-manager.org.apache.juli.AsyncFileHandler.directory = ${catalina.base}/logs  
4host-manager.org.apache.juli.AsyncFileHandler.prefix = host-manager.log.  
4host-manager.org.apache.juli.AsyncFileHandler.suffix =  

因為tomcat日誌檔案滾動格式預設為“catalina..log”,我們應該把它調整為“catalina.log.”,我們可以通過上述配置方式來達成,最終我們希望無論是tomcat自己的日誌、application的業務日誌,滾動生成的檔名格式都統一為“.log.”,這樣便於我們在flume中配置正則表示式來spooling這些歷史檔案。

Flume的配置檔案與nginx基本類似,此處不再贅言。

業務日誌

我們約定application的業務日誌也列印在${tomcat_home}/logs目錄下,即與catalina.out在一個目錄,每個業務日誌每天滾動生成新的歷史檔案,檔案字尾以“.yyyy-MM-dd”結尾,這類檔案稱為歷史檔案,被同步到HDFS中。對於實時的日誌資訊,我們仍然傳送給kafka,kafka topic的設計思路跟nginx一樣,每個project一種檔案對應一個topic,每種檔案的日誌來自多個application例項,它們混淆在kafka topic中,為了便於日誌分揀,我們需要在每條日誌中增加一個IP標誌項。本人整理髮現,在logback中列印local ip預設是不支援的,所以我們需要變通一下,我們在tomcat的啟動指令碼中定義一個LOCAL_IP這個環境變數,然後再logback.xml中引入即可解決。

##catalina.sh  
##add  
export LOCAL_IP=`hostname -I`

在專案中的logback.xml中即可通過${LOCAL_IP}變數宣告即可

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">  
   <file>${LOG_HOME}/order_center.log</file>  
   <Append>true</Append>  
   <prudent>false</prudent>  
   <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">  
       <pattern>${LOCAL_IP} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>  
   </encoder>  
   <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">  
       <FileNamePattern>${LOG_HOME}/order_center.log.%d{yyyy-MM-dd}</FileNamePattern>  
       <maxHistory>72</maxHistory>  
   </rollingPolicy>  
</appender>

access_log日誌

tomcat的access_log非常重要,可以列印很多資訊來幫助我們分析業務問題,所以我們需要將acess_log日誌整理規範;我們在server.xml中通過修改如下內容即可:

<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"  
          prefix="localhost_access_log" suffix=".log" renameOnRotate="true"  
          pattern="%A|%h|%m|%t|%D|&quot;%r&quot;|&quot;%{Referer}i&quot;|&quot;%{User-Agent}i&quot;|%s|%S|%b|%{X-Request-ID}i|%{begin:msec}t|%{end:msec}t" />

“renameOnRotate”表示是否在rotate時機重新命名access_log,我們設定為true,這樣access_log檔名預設不帶日期格式,時間格式在rotate期間才新增進去。“%A”表示本機的local ip地址,也是用於kakfa分揀日誌的標記,X-Request-ID是nginx層自定義的一個trace-ID用於跟蹤請求的,如果你沒有設定,則可以去掉。

到此為止,我們基本上可以完成這一套日誌採集系統了,也為kafka分揀日誌資訊做好了鋪墊,後續接入ELK、storm實時資料分析等也將相對比較容易。

問題總結:

  • flume + hdfs:

    • 我們首先將hdfs-site.xml,core-site.xml複製到${flume_home}/conf目錄下。且flume機器能夠與hdfs所有節點通訊(網路隔離、防火牆都可能導致它們無法正常通訊)。

    • 在Flume根目錄下,建立一個plugins.d/hadoop目錄,建立lib、libext、native子目錄;並將hadoop的相關依賴包複製到libext目錄中:

commons-configuration-1.6.jar  
hadoop-annotations-2.6.5.jar  
hadoop-auth-2.6.5.jar  

相關推薦

Flume + kafka + HDFS構建日誌採集系統

本文轉載自:http://shift-alt-ctrl.iteye.com/blo

flume-kafka整合--實時日誌採集

flume採用架構 exec-source + memory-channel + avro-sink avro-source + memory-channel + kafka-sink

日誌採集系統flumekafka有什麼區別及聯絡?

日誌採集系統flume和kafka有什麼區別及聯絡,它們分別在什麼時候使用,什麼時候又可以結合? 觀點一: 簡言之:這兩個差別很大,使用場景區別也很大。 先說flume: 日誌採集。線上資料一般主要是落地檔案或者通過socket傳輸給另外一個系統。這種情況下,你很難推動線上應用或服務去修改介

開源日誌採集系統比較:scribe、chukwa、kafkaflume

1. 背景介紹 許多公司的平臺每天會產生大量的日誌(一般為流式資料,如,搜尋引擎的pv,查詢等),處理這些日誌需要特定的日誌系統,一般而言,這些系統需要具有以下特徵: (1)構建應用系統和分析系統的橋樑,並將它們之間的關聯解耦; (2)支援近實時的線上分析系統和類似於

大資料技術學習筆記之網站流量日誌分析專案:Flume日誌採集系統1

一、網站日誌流量專案     -》專案開發階段:         -》可行性分析         -》需求分析  

ELK+kafka構建日誌收集系統

背景: 最近線上上了ELK,但是隻用了一臺Redis在中間作為訊息佇列,以減輕前端es叢集的壓力,Redis的叢集解決方案暫時沒有接觸過,並且Redis作為訊息佇列並不是它的強項;所以最近將Redis換成了專業的訊息資訊釋出訂閱系統Kafka, Kafka的更多介紹大家可以看這裡: 傳

Flume日誌採集系統與Logstash對比

本文就從如下的幾個方面講述下我的使用心得: 初體驗——與Logstash的對比 安裝部署 啟動教程 引數與例項分析 Flume初體驗 Flume的配置是真繁瑣,source,channel,sink的關係在配置檔案裡面交織在一起,沒有Logstash那麼簡單明瞭。

基於Flume+kafka打造實時日誌收集分析系統

Kafka broker修改conf/server.properties檔案,修改內容如下:           broker.id=1           host.name=172.16.137.196 port=10985           log.dirs=/data/kafka

elk+kafka 分散式日誌採集系統設計

Filebeat (每個微服務啟動一個)--->Kafka叢集--->Logstash(one)-->Elasticsearch叢集 一、資料流從檔案端到Kafka 叢集端,通過Filebeat 1.下載 Filebeat #cd  /opt/filebeat

flume分散式日誌採集系統實戰-陳耀武-專題視訊課程

flume分散式日誌採集系統實戰—4303人已學習 課程介紹        隨著公司業務的不斷增長,劃分了許多應用,不同應用的日誌在不同伺服器上面,很難進行統一管理,通過學習該課程,你可以自己搭建日誌採集系統,可以進行資料分析,挖掘等工作課程收益    通過學習該課程,可以快

基於Heka+Flume+Kafka+ELK的日誌系統

前期準備 系統是centos6.6,64位機器。 所用軟體版本: Logstash:2.3.3 JDK:1.8.0_25 Elasticsearch:2.3.4 Kibana:4.5.2 Heka:0.10.0 Flume:1.7.0 Zookeeper:3.4

flume+kafka+hdfs詳解

utf-8 conf prop nts command format ext sink 1.4 flume架構圖 單節點flume配置 flume-1.4.0 啟動flume bin/flume-ng agent --conf ./conf -f conf/flume

Apache flume+Kafka獲取實時日誌資訊

Flume簡介以及安裝 Flume是一個分散式的對海量日誌進行採集,聚合和傳輸的系統。Flume系統分為三個元件,分別是source,sink,channel:source表明資料的來源,可能來自檔案,Avro等,channel作為source和sink的橋樑,作為資料的臨時儲存地,channal是

在滴滴雲 DC2 雲伺服器上搭建 ELK 日誌採集系統

前段時間大學同學聚會,一個在讀的博士同學談到他們實驗室做實驗時,物理伺服器需要向老師申請且組內同學都共用那些機器。由於執行一些大資料和人工智慧的專案比較耗費資源,且對資源環境的隔離要求比較高,因而很多時候用機器還得排隊(畢竟學校經費和底層基礎設施沒有公司充裕),不是很方便。於是我就對他說到,

Flume+Kafka+Storm+Redis實時分析系統基本架構

今天作者要在這裡通過一個簡單的電商網站訂單實時分析系統和大家一起梳理一下大資料環境下的實時分析系統的架構模型。當然這個架構模型只是實時分析技術的一 個簡單的入門級架構,實際生產環境中的大資料實時分析技術還涉及到很多細節的處理, 比如使用Storm的ACK機制保證資料都能被正確處理, 叢集的高可用架構

Flume-NG + HDFS + HIVE 日誌收集分析

[[email protected] apache-flume-1.3.0-bin]# cat /data/apache-flume-1.3.0-bin/conf/flume.conf# Define a memory channel called c1 on a1a1.channels.c1.ty

flume+kafka收集業務日誌

介紹 我們的使用者是經常在登陸,由於是涉及到裝置,產品希望每個使用者一登陸,後臺系統就能感知到其變化,即實時更新, 登陸資料量是很大的,大約一天有1500W左右的資料,且比較集中在晚上.高峰時1秒鐘要處理200多個登陸請求, 負責登陸的系統是業務的核心,架

Flume+Kafka環境構建和實戰

1. 準備工作 apache上下載 apache-flume-1.7.0, apache-kafka_2.12-0.11, apache-zookeeper-3.4.9 下載後分別解壓至/home/hadoop/bigdata並重命名目錄為flume, kafka, zk

基於Flume的美團日誌收集系統(二)改進和優化

問題導讀: 1.Flume的存在些什麼問題? 2.基於開源的Flume美團增加了哪些功能? 3.Flume系統如何調優? 在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節

基於Flume的美團日誌收集系統(一)架構和設計

美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。 《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。 第一部