Flume淺度學習指南
flume簡介
cloudera 公司開源的,貢獻給Apache基金會
http://archive.cloudera.com/c...
只能執行在linux系統上
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
flume用來高效的收集、聚合、移動大量的日誌資料
有一個基於流式的簡單的有彈性的傳輸模型
有一個健壯的可容錯的機制
使用簡單,可以擴充套件的資料模型執行使用到線上實時分析應用中
簡單體現在flume-agent的配置及傳輸模型簡單
線上實時分析應用中
flume日誌的實時採集-》sparkStreaming/storm/Flink =》mysql/redis=》實時分析的結果進行報表展示
資料(日誌)的移動傳輸工具:
日誌=》系統執行日誌、web伺服器的訪問日誌、客戶端的使用者行為日誌、軟體的執行操作日誌
可以將資料從資料來源中採集並移動到另外一個目的地:
資料來源=》系統本地日誌檔案中的資料、jms、avro埠、kafka、系統本地目錄下... 目的地=》hdfs、hive、hbase、kafka、系統本地一個檔案中...
如何將linux本地的一個日誌檔案中的日誌資料採集到hdfs上
-
指令碼+hdfs命令 =》【週期性】上傳
#!/bin/sh HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2 $HADOOP_HOME/bin/hdfs -put /.../xx.log/hdfs
針對專案初期資料量較少時可以使用 , 沒有容災性及穩定性
-
採用flume日誌採集框架=》【實時】採集一個日誌檔案中實時追加的日誌資料並寫入到目的地
針對不同的應用場景定義並啟動對應的flume-agent例項/程序
source-- 定義從哪裡採集資料 exec型別的source可以藉助Linux的shell命令實現實時讀取一個日誌檔案中動態追加的日誌資料 avro型別 …… channel-- 定義了source採集的資料臨時儲存地 memory 基於記憶體的管道容器 file 基於磁碟 sink-- 定義將資料最終寫入的-目的地 hdfs型別的sink將資料最終寫入到hdfs上 hive型別將資料最終寫入到hive表 kafka型別將資料最終寫入到kafka分散式訊息佇列中 ……
flume-agent例項的模型
每個flume-agent例項至少由以下三個功能模組組成 source模組 用於監控資料來源並進行資料的實時採集,是實時產生資料流的模組 資料來源=》系統本地的一個日誌檔案中、kafka、jms、系統本地的一個目錄下、avro埠。。。 source將採集到的資料提交到channel中 channel模組 用於連線source和sink的管道容器 類似一個佇列(FIFO) sink模組 從channel中拉取take(剪下)資料並最終將資料寫入到目的地 目的地=》hdfs、hive、hbase、kafka、avro埠... event事件: event事件是flume傳輸日誌資料時基本單元,在flume-agent內部資料都是以事件形式存在 source將採集到的資料封裝成一個個的event事件,將事件提交到channel sink從channel消費事件並將事件中封裝的資料最終寫入到目的地 event事件的資料結構:header + body header 是一個map集合型別 內部的key-value為該事件的元資料資訊,主要用來區分不同的事件 body 是一個位元組陣列型別 body為我們真正要傳輸的資料
flume的安裝使用
flume-ng-1.6.0-cdh5.14.2 安裝 1、上次解壓flume的安裝包 $ tar zxvf/opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/ $ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2修改目錄名稱-可選 2、修改flume配置檔案 $ mv conf/flume-env.sh.template conf/flume-env.sh修改後環境配置檔案才能生效 $ vi conf/flume-env.sh export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112 3、針對不同的場景需求配置對應的java屬性配置檔案並啟動flume-agent程序 如何啟動一個flume-agent程序 $ bin/flume-ng agent\ --name或-n 當前flume-agent例項的別名\ --conf或-c 當前flume框架的配置檔案目錄 \ --conf-file,-f 與當前要啟動的flume-agent程序相匹配的java屬性配置檔案的本地路徑 Usage: bin/flume-ng <command> [options]...
案例
案例一:
flume官方簡單案例
定義一個flume-agent去監聽讀取某臺伺服器上的某個埠中的資料,並將監聽讀取到的資料最終寫入到flume框架自己的日誌檔案中
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = centos01 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
提交測試: $ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties & 確定目標伺服器的埠是否已經成功被flume-agent代理程序監聽 $ netstat -antp |grep 44444--檢視埠資訊 $ ps -ef | grep flume-- 檢視程序資訊 安裝一個telnet工具並連線伺服器埠寫入資料 $ sudo yum -y install telnet 傳送訊息資料 檢查flume的日誌檔案中的資料 $ tail -f logs/flume.log
案例二:
要求使用flume實時監控讀取系統本地一個日誌檔案中動態追加的日誌資料並實時寫入到hdfs上的某個目錄下
# example.conf: A single-node Flume configuration #同一臺Linux上可開啟多個flume-agent,但agent別名要區分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #依靠的是Linux的命令讀取本地檔案,Linux的命令不停止flume就不停 a2.sources.r2.type = exec # tail -F 檔名即使沒有這個-F後面指定的檔案,命令也不會停止,容錯能力強 a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 #宣告a2的sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog a2.sinks.k2.hdfs.filePrefix = nginxData # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
報錯首先找logs目錄
報錯:找不到類
缺少jar包
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar/opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar/opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar/opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar/opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
案例三:
案例二的優化:
解決生成的檔案過多過小的問題(希望檔案的大小=128M) 將日誌檔案按照日期分目錄儲存(按照天分目錄儲存) 將生成的日誌檔案的格式改為Text文字格式
修改案例二的flume-agent屬性檔案
# 聲明當前flume-agent的別名及當前的flume-agent例項包含的模組的別名和個數 a2.sources = s2 a2.channels = c2 a2.sinks = k2 # 定義source模組中的s2的型別及與此型別相關的延伸屬性 # exec型別的source可以藉助執行一條linux shell命令實現讀取linux系統上某個檔案中的日誌資料,其中 cat是一次性讀取,tail可以實現實時讀取新增加的資料 # shell屬性用來宣告要執行的命令的執行環境 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /opt/nginx/access.log a2.sources.s2.shell = /bin/sh -c # 定義channel模組中的c2的型別及與此型別相關的延伸屬性 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # 定義sink模組中的k2的型別及與此型別相關的延伸屬性 a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d #啟用根據時間生成路徑中的轉義字元的具體的時間值 a2.sinks.k2.hdfs.round = true #表示使用本地linux系統時間戳作為時間基準,否則會自動參考事件的header中的時間戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #設定檔案的字首 a2.sinks.k2.hdfs.filePrefix = NgnixLog #設定解決檔案過多過小問題 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設定會導致上面的三個引數不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上檔案中的最大event數量 #batchSize的值需要小於等於transactionCapacity的值 #從效能上考慮,最優的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100 # fileType定義的是資料流的格式,預設的資料流的格式為SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 寫入到hdfs上的檔案的格式(序列化方法) # 格式改為text後,可以通過cat 或 text 命令檢視檔案中的日誌內容 a2.sinks.k2.hdfs.writeFormat = Text # 將a2中的source及sink模組繫結到對應的channel模組上 # 一個source模組可以同時繫結多個channel模組,但是一個sink模組只能繫結一個唯一的channel a2.sources.s2.channels = c2 a2.sinks.k2.channel = c2
案例四:
利用flume監控某個目錄下的日誌檔案,當某個目錄下出現符合要求的檔名稱的檔案時,則對檔案中的日誌資料進行讀取,並將資料最終寫入到hdfs上
目錄 /opt/data/logs nginx-access.log.2018120309 nginx-access.log.2018120310
# example.conf: A single-node Flume configuration #同一臺Linux上可開啟多個flume-agent,但agent別名要區分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source # includePattern 用正則表示式指定要包含的檔案 # ignorePattern用正則表示式指定要忽略的檔案 a2.sources.r2.type = spooldir a2.sources.r2.spoolDir = /home/chen/mylogs # 由於每次讀完會給讀完的檔案增加.COMPLETED從而形成新檔案,需要忽略這些檔案 a2.sources.r2.ignorePattern = ^.*\.COMPLETED$ # includePattern和ignorePattern會同時生效 a2.sources.r2.includePattern =^.*$ # Use a channel # file型別更安全 # memory型別效率更高 a2.channels.c2.type = file a2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data #宣告a2的sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d #啟用根據時間生成轉義字元的具體的時間值 a2.sinks.k2.hdfs.round = true #使用本地linux系統時間戳作為時間基準,否則會自動參考事件的header中的時間戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true a2.sinks.k2.hdfs.filePrefix = nginxData #設定解決檔案過多過小問題 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設定會導致上面的三個引數不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上檔案中的最大event數量 #batchSize的值需要小於等於transactionCapacity的值 #從效能上考慮,最優的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100 # fileType定義的是資料流的格式,預設的資料流的格式為SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 寫入到hdfs上的檔案的格式(序列化方法) # 格式改為text後,可以通過cat 或 text 命令檢視檔案中的日誌內容 a2.sinks.k2.hdfs.writeFormat = Text # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
案列五:
需求: Nginx伺服器叢集 -- 10臺 每臺伺服器上都有一個access.log日誌檔案 需要將每臺伺服器上的日誌檔案中追加的日誌資料實時讀取並寫入到hdfs上 思路1: 每臺Nginx伺服器上啟動一個flume-agent source - exec channel - mem sink - hdfs 多個flume-agent同時寫入資料到hfds上不利於hdfs的穩定性 思路2: 每臺Nginx伺服器上啟動一個flume-agent source - exec channel - mem sink - avro type = avro hostname = 主機名 port =埠號 將資料統一寫入到某臺伺服器某個埠中 啟動一個負責對彙總後的資料統一寫入到目的地的flum-agent source - avro type = avro bind = port = channel - mem sink - hdfs
nginxs2flume.properties
# example.conf: A single-node Flume configuration #同一臺Linux上可開啟多個flume-agent,但agent別名要區分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #依靠的是Linux的命令讀取本地檔案,Linux的命令不停止flume就不停 a2.sources.r2.type = exec # tail -F 檔名即使沒有這個-F後面指定的檔案,命令也不會停止,容錯能力強 a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 #宣告a2的sink a2.sinks.k2.type = avro a2.sinks.k2.hostname = centos01 a2.sinks.k2.port = 6666 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
flume2hdfs.properties
# example.conf: A single-node Flume configuration #同一臺Linux上可開啟多個flume-agent,但agent別名要區分 a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = avro a3.sources.r3.bind = centos01 a3.sources.r3.port = 6666 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 #宣告a3的sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/test a3.sinks.k3.hdfs.writeFormat = Text # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3