Flume相關命令,躍點,引數及配置檔案總結
阿新 • • 發佈:2019-02-08
flume
----------------
收集、移動、聚合大量日誌資料的服務。
基於流資料的架構,用於線上日誌分析。
基於事件。
在生產和消費者之間啟動協調作用。
提供了事務保證,確保訊息一定被分發。
Source 多種
sink多種.
multihop //多級躍點.
水平擴充套件: //加節點
豎直擴充套件 //增加硬體。
Source
-------------
接受資料,型別有多種。
Channel
-------------
臨時存放地,對Source中來的資料進行緩衝,直到sink消費掉。
Sink
-------------
從channel提取資料存放到中央化儲存(hadoop / hbase)。
安裝flume
-------------
1.下載
2.tar
3.環境變數
4.驗證flume是否成功
$>flume-ng version //next generation.下一代.
配置flume
---------------
1.建立配置檔案
[/soft/flume/conf/hello.conf]
#宣告三種元件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#定義source資訊
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
#定義sink資訊
a1.sinks.k1.type=logger
#定義channel資訊
a1.channels.c1.type=memory
#繫結在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
2.執行
a)啟動flume agent
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
b)啟動nc的客戶端
$>nc localhost 8888
$nc>hello world
c)在flume的終端輸出hello world.
安裝nc
---------------
$>sudo yum install nmap-ncat.x86_64
清除倉庫快取
-----------------------
$>修改ali.repo --> ali.repo.bak檔案。
$>sudo yum clean all
$>sudo yum makecache
#例如阿里基本源
$>sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
#阿里epel源
$>sudo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
flume source
-------------------
1.netcat
nc ..
2.exec
實時日誌收集,實時收集日誌。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /home/centos/test.txt
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
3.批量收集
監控一個資料夾,靜態檔案。
收集完之後,會重新命名檔案成新檔案。.compeleted.
a)配置檔案
[spooldir_r.conf]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/centos/spool
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
b)建立目錄
$>mkdir ~/spool
c)啟動flume
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
4.序列source
[seq]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=seq
a1.sources.r1.totalEvents=1000
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
[執行]
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
5.StressSource
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
flume sink
------------------
1.hdfs
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
#是否是產生新目錄,每十分鐘產生一個新目錄,一般控制的目錄方面。
#2017-12-12 -->
#2017-12-12 -->%H%M%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#是否產生新檔案。
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=10
a1.sinks.k1.hdfs.rollCount=3
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.hive
略
3.hbase
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hbase
a1.sinks.k1.table = ns1:t12
a1.sinks.k1.columnFamily = f1
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.kafka
使用avroSource和AvroSink實現躍點agent處理
-----------------
1.建立配置檔案
[avro_hop.conf]
#a1
a1.sources = r1
a1.sinks= k1
a1.channels = c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=9999
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#a2
a2.sources = r2
a2.sinks= k2
a2.channels = c2
a2.sources.r2.type=avro
a2.sources.r2.bind=localhost
a2.sources.r2.port=9999
a2.sinks.k2.type = logger
a2.channels.c2.type=memory
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
2.啟動a2
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console
3.驗證a2
$>netstat -anop | grep 9999
4.啟動a1
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1
5.驗證a1
$>netstat -anop | grep 8888
channel
-----------------
1.MemoryChannel
略
2.FileChannel
a1.sources = r1
a1.sinks= k1
a1.channels = c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.sinks.k1.type=logger
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/centos/flume/fc_check
a1.channels.c1.dataDirs = /home/centos/flume/fc_data
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
可溢位檔案通道
------------------
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
#0表示禁用記憶體通道,等價於檔案通道
a1.channels.c1.memoryCapacity = 0
#0,禁用檔案通道,等價記憶體通道。
a1.channels.c1.overflowCapacity = 2000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /user/centos/flume/fc_check
a1.channels.c1.dataDirs = /user/centos/flume/fc_data
建立Flume模組
------------------
1.新增pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.it18zhang</groupId>
<artifactId>FluemDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
</project>
----------------
收集、移動、聚合大量日誌資料的服務。
基於流資料的架構,用於線上日誌分析。
基於事件。
在生產和消費者之間啟動協調作用。
提供了事務保證,確保訊息一定被分發。
Source 多種
sink多種.
multihop //多級躍點.
水平擴充套件: //加節點
豎直擴充套件 //增加硬體。
Source
-------------
接受資料,型別有多種。
Channel
-------------
臨時存放地,對Source中來的資料進行緩衝,直到sink消費掉。
Sink
-------------
從channel提取資料存放到中央化儲存(hadoop / hbase)。
安裝flume
-------------
1.下載
2.tar
3.環境變數
4.驗證flume是否成功
$>flume-ng version //next generation.下一代.
配置flume
---------------
1.建立配置檔案
[/soft/flume/conf/hello.conf]
#宣告三種元件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#定義source資訊
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
#定義sink資訊
a1.sinks.k1.type=logger
#定義channel資訊
a1.channels.c1.type=memory
#繫結在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
2.執行
a)啟動flume agent
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
b)啟動nc的客戶端
$>nc localhost 8888
$nc>hello world
c)在flume的終端輸出hello world.
安裝nc
---------------
$>sudo yum install nmap-ncat.x86_64
清除倉庫快取
-----------------------
$>修改ali.repo --> ali.repo.bak檔案。
$>sudo yum clean all
$>sudo yum makecache
#例如阿里基本源
$>sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
#阿里epel源
$>sudo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
flume source
-------------------
1.netcat
nc ..
2.exec
實時日誌收集,實時收集日誌。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /home/centos/test.txt
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
3.批量收集
監控一個資料夾,靜態檔案。
收集完之後,會重新命名檔案成新檔案。.compeleted.
a)配置檔案
[spooldir_r.conf]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/centos/spool
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
b)建立目錄
$>mkdir ~/spool
c)啟動flume
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
4.序列source
[seq]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=seq
a1.sources.r1.totalEvents=1000
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
[執行]
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
5.StressSource
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
flume sink
------------------
1.hdfs
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
#是否是產生新目錄,每十分鐘產生一個新目錄,一般控制的目錄方面。
#2017-12-12 -->
#2017-12-12 -->%H%M%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#是否產生新檔案。
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=10
a1.sinks.k1.hdfs.rollCount=3
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.hive
略
3.hbase
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hbase
a1.sinks.k1.table = ns1:t12
a1.sinks.k1.columnFamily = f1
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.kafka
使用avroSource和AvroSink實現躍點agent處理
-----------------
1.建立配置檔案
[avro_hop.conf]
#a1
a1.sources = r1
a1.sinks= k1
a1.channels = c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=9999
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#a2
a2.sources = r2
a2.sinks= k2
a2.channels = c2
a2.sources.r2.type=avro
a2.sources.r2.bind=localhost
a2.sources.r2.port=9999
a2.sinks.k2.type = logger
a2.channels.c2.type=memory
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
2.啟動a2
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console
3.驗證a2
$>netstat -anop | grep 9999
4.啟動a1
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1
5.驗證a1
$>netstat -anop | grep 8888
channel
-----------------
1.MemoryChannel
略
2.FileChannel
a1.sources = r1
a1.sinks= k1
a1.channels = c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.sinks.k1.type=logger
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/centos/flume/fc_check
a1.channels.c1.dataDirs = /home/centos/flume/fc_data
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
可溢位檔案通道
------------------
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
#0表示禁用記憶體通道,等價於檔案通道
a1.channels.c1.memoryCapacity = 0
#0,禁用檔案通道,等價記憶體通道。
a1.channels.c1.overflowCapacity = 2000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /user/centos/flume/fc_check
a1.channels.c1.dataDirs = /user/centos/flume/fc_data
建立Flume模組
------------------
1.新增pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.it18zhang</groupId>
<artifactId>FluemDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
</project>