1. 程式人生 > >Flume相關命令,躍點,引數及配置檔案總結

Flume相關命令,躍點,引數及配置檔案總結

flume
----------------
    收集、移動、聚合大量日誌資料的服務。
    基於流資料的架構,用於線上日誌分析。

    基於事件。
    在生產和消費者之間啟動協調作用。
    提供了事務保證,確保訊息一定被分發。
    Source 多種
    sink多種.

    multihop        //多級躍點.

    水平擴充套件:        //加節點
        
    豎直擴充套件        //增加硬體。

Source
-------------
    接受資料,型別有多種。


Channel
-------------
    臨時存放地,對Source中來的資料進行緩衝,直到sink消費掉。

Sink
-------------
    從channel提取資料存放到中央化儲存(hadoop / hbase)。
    

安裝flume
-------------
    1.下載
    2.tar
    3.環境變數
    4.驗證flume是否成功
        $>flume-ng version            //next generation.下一代.
    

配置flume
---------------
    1.建立配置檔案
    [/soft/flume/conf/hello.conf]
    #宣告三種元件
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1

    #定義source資訊
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=8888

    #定義sink資訊
    a1.sinks.k1.type=logger

    #定義channel資訊
    a1.channels.c1.type=memory

    #繫結在一起
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1

    2.執行
        a)啟動flume agent
            $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
        b)啟動nc的客戶端
            $>nc localhost 8888
            $nc>hello world
        
        c)在flume的終端輸出hello world.



安裝nc
---------------
    $>sudo yum install nmap-ncat.x86_64

清除倉庫快取
-----------------------
    $>修改ali.repo --> ali.repo.bak檔案。
    $>sudo yum clean all
    $>sudo yum makecache

    #例如阿里基本源
    $>sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo

    #阿里epel源
    $>sudo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo

flume source
-------------------
    1.netcat
        nc ..

    2.exec
        實時日誌收集,實時收集日誌。
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        a1.sources.r1.type=exec
        a1.sources.r1.command=tail -F /home/centos/test.txt

        a1.sinks.k1.type=logger

        a1.channels.c1.type=memory

        a1.sources.r1.channels=c1
        a1.sinks.k1.channel=c1

    3.批量收集
        監控一個資料夾,靜態檔案。
        收集完之後,會重新命名檔案成新檔案。.compeleted.
        a)配置檔案
            [spooldir_r.conf]
            a1.sources = r1
            a1.channels = c1
            a1.sinks = k1

            a1.sources.r1.type=spooldir
            a1.sources.r1.spoolDir=/home/centos/spool
            a1.sources.r1.fileHeader=true

            a1.sinks.k1.type=logger

            a1.channels.c1.type=memory

            a1.sources.r1.channels=c1
            a1.sinks.k1.channel=c1

        b)建立目錄
            $>mkdir ~/spool

        c)啟動flume
            $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
    
    4.序列source
        [seq]
        a1.sources = r1
        a1.channels = c1
        a1.sinks = k1

        a1.sources.r1.type=seq
        a1.sources.r1.totalEvents=1000

        a1.sinks.k1.type=logger

        a1.channels.c1.type=memory

        a1.sources.r1.channels=c1
        a1.sinks.k1.channel=c1

        [執行]
        $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console

    5.StressSource
        a1.sources = stresssource-1
        a1.channels = memoryChannel-1
        a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
        a1.sources.stresssource-1.size = 10240
        a1.sources.stresssource-1.maxTotalEvents = 1000000
        a1.sources.stresssource-1.channels = memoryChannel-1


flume sink
------------------
    1.hdfs
        a1.sources = r1
        a1.channels = c1
        a1.sinks = k1

        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
        a1.sinks.k1.hdfs.filePrefix = events-

        #是否是產生新目錄,每十分鐘產生一個新目錄,一般控制的目錄方面。
        #2017-12-12 -->
        #2017-12-12 -->%H%M%S

        a1.sinks.k1.hdfs.round = true            
        a1.sinks.k1.hdfs.roundValue = 10
        a1.sinks.k1.hdfs.roundUnit = second
        
        a1.sinks.k1.hdfs.useLocalTimeStamp=true

        #是否產生新檔案。
        a1.sinks.k1.hdfs.rollInterval=10
        a1.sinks.k1.hdfs.rollSize=10
        a1.sinks.k1.hdfs.rollCount=3

        a1.channels.c1.type=memory

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    2.hive
        略

    3.hbase
        a1.sources = r1
        a1.channels = c1
        a1.sinks = k1

        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        a1.sinks.k1.type = hbase
        a1.sinks.k1.table = ns1:t12
        a1.sinks.k1.columnFamily = f1
        a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

        a1.channels.c1.type=memory

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    4.kafka


使用avroSource和AvroSink實現躍點agent處理
-----------------
    1.建立配置檔案
        [avro_hop.conf]
        #a1
        a1.sources = r1
        a1.sinks= k1
        a1.channels = c1

        a1.sources.r1.type=netcat
        a1.sources.r1.bind=localhost
        a1.sources.r1.port=8888

        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname=localhost
        a1.sinks.k1.port=9999

        a1.channels.c1.type=memory

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

        #a2
        a2.sources = r2
        a2.sinks= k2
        a2.channels = c2

        a2.sources.r2.type=avro
        a2.sources.r2.bind=localhost
        a2.sources.r2.port=9999

        a2.sinks.k2.type = logger

        a2.channels.c2.type=memory

        a2.sources.r2.channels = c2
        a2.sinks.k2.channel = c2

    2.啟動a2
        $>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console

    3.驗證a2
        $>netstat -anop | grep 9999
        
    
    4.啟動a1
        $>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1
        
    5.驗證a1
        $>netstat -anop | grep 8888
    

channel
-----------------
    1.MemoryChannel
        略
    2.FileChannel
a1.sources = r1
a1.sinks= k1
a1.channels = c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

a1.sinks.k1.type=logger

        a1.channels.c1.type = file
        a1.channels.c1.checkpointDir = /home/centos/flume/fc_check
        a1.channels.c1.dataDirs = /home/centos/flume/fc_data

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1


可溢位檔案通道
------------------
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
#0表示禁用記憶體通道,等價於檔案通道
a1.channels.c1.memoryCapacity = 0
#0,禁用檔案通道,等價記憶體通道。
a1.channels.c1.overflowCapacity = 2000

a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /user/centos/flume/fc_check
a1.channels.c1.dataDirs = /user/centos/flume/fc_data


建立Flume模組
------------------
    1.新增pom.xml
        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
            <groupId>com.it18zhang</groupId>
            <artifactId>FluemDemo</artifactId>
            <version>1.0-SNAPSHOT</version>
            <dependencies>
                <dependency>
                    <groupId>org.apache.flume</groupId>
                    <artifactId>flume-ng-core</artifactId>
                    <version>1.7.0</version>
                </dependency>
                <dependency>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                    <version>4.11</version>
                </dependency>
            </dependencies>
        </project>