1. 程式人生 > >Spark Streaming實時流處理專案實戰筆記

Spark Streaming實時流處理專案實戰筆記

第二章 分散式日誌收集框架Flume

課程目錄
業務現狀分析=>flume概述=>flume架構及核心元件=>flume環境部署=>flume實戰

1、業務現狀分析

  • WebServer/ApplicationServer分散在各個機器上
  • 大資料平臺Hadoop進行統計分析
  • 日誌如何收集到Hadoop平臺上
  • 解決方案及存在問題

傳統從Server到Hadoop處理上存在的問題
1.難以監控
2.IO的讀寫開銷大
3.容錯率高,負載均衡差
4.高延時,需隔一段時間啟動

2、flume概述

flume官網:http://flume.apache.org/

Flume is a distributed(分散式的), reliable(高可靠的), and available service(高可用的服務) for efficiently collecting(海量收集), aggregating(聚合), and moving(移動系統) large amounts of log data.
Flume是由Cloudera提供的一個分散式、高可靠、高可用的服務,用於分散式的海量日誌的高效收集、聚合、移動系統

設計目標
可靠性
擴充套件性
管理性

業界同類產品的對比
Flume: Cloudera/Apache Java
Scribe: Facebook C/C++ 不再維護
Chukwa: Yahoo/Apache Java 不再維護
Kafka:
Fluentd: Ruby
Logstash

: ELK(ElasticSearch,Kibana)

Flume發展史
Cloudera 0.9.2 Flume-OG
flume-728 Flume-NG ==> Apache
2012.7 1.0
2015.5 1.6
~ 1.7

Flume架構及核心元件

  1. Source 收集
  2. Channel 聚集
  3. Sink 輸出
    在這裡插入圖片描述
    Flume安裝前置條件
    1.Java Runtime Environment - Java 1.8 or later
    2.Memory - Sufficient memory for configurations used by sources, channels or sinks
    3.Disk Space - Sufficient disk space for configurations used by channels or sinks
    4.Directory Permissions - Read/Write permissions for directories used by agent

安裝jdk
下載
解壓到~/app
將java配置系統環境變數中: vi ~/.bash_profile
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144
export PATH= J A V A H O M E / b i n : JAVA_HOME/bin: PATH
source下讓其配置生效:source ~/.bash_profile
檢測: java -version

安裝Flume
下載
解壓到~/app
將java配置系統環境變數中: vi ~/.bash_profile
export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
export PATH= F L U M E H O M E / b i n : FLUME_HOME/bin: PATH
source下讓其配置生效 :source ~/.bash_profile
flume-env.sh的配置:export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144
檢測: flume-ng version

Flume架構及核心元件

Flume實戰:

需求一:從指定網路埠採集資料輸出到控制檯

在這裡插入圖片描述

使用Flume的關鍵就是寫配置檔案
A) 配置Source
B) 配置Channel
C) 配置Sink
D) 把以上三個元件串起來

a1: agent名稱
r1: source的名稱
k1: sink的名稱
c1: channel的名稱

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

查詢官網文件
http://flume.apache.org/FlumeUserGuide.html#avro-legacy-source

 a1.sources.r1.type = netcat
    a1.sources.r1.bind = hadoop000
    a1.sources.r1.port = 44444

type:The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
host:The hostname or IP address to bind to
port:The port # to listen on

a1.sinks.k1.type = logger

type:The component type name, needs to be logger

a1.channels.c1.type = memory

type:The component type name, needs to be memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意:一個source可以輸出到多個channel,因此上面是channels;而一此只能從channel輸出一個到sink,因此下面是channel

步驟:
1.寫配置檔案
在conf目錄下:vi example.conf
將上面程式碼寫入其中
2.啟動agent

flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console

3.使用telnet進行測試: telnet hadoop000 44444

需求二:監控一個檔案實時採集新增的資料輸出到控制檯

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

channels
type:The component type name, needs to be exec
command:The command to execute
shell:A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.

步驟:
1.寫配置檔案
在conf目錄下:vi exec-memory-logger.conf
將上面程式碼寫入其中
2.啟動agent

flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-memory-logger.conf \
-Dflume.root.logger=INFO,console

3.測試:

新開啟一個視窗:輸入以下內容

  [[email protected] data]$ echo hello >> data.log 
    [[email protected] data]$ echo world >> data.log 
    [[email protected] data]$ echo welcome >> data.log 

原視窗會出現以下變化:

Event: { headers:{} body: 68 65 6C 6C 6F  				hello }
Event: { headers:{} body: 77 6F 72 6C 64  				world }
Event: { headers:{} body: 77 65 6C 63 6F 6D 65			welcome }

需求三:將A伺服器上的日誌實時採集到B伺服器端

技術選項:

exec source + memory channel + avro sink
avro source + memory channel + logger sink

兩個配置檔案:

exec-memory-avro.conf

# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.bind = hadoo000
exec-memory-avro.sinks.avro-sink.port = 4444

# Use a channel which buffers events in memory
exec-memory-avro.channels.exec-source.type = memory

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

avro-memory-logger.conf

# Name the components on this agent
avro-memory-logger.sources = avro source
avro-memory-logger.sinks = logger sink
avro-memory-logger.channels = memory-channel

# Describe/configure the source
avro-memory-logger.sources.avro source.type = avro
avro-memory-logger.sources.avro source.bind = hadoop000
avro-memory-logger.sources.avro source.port = 44444 

# Describe the sink
avro-memory-logger.logger sink.type = logger

# Use a channel which buffers events in memory
avro-memory-logger.channels.avro source.type = memory

# Bind the source and sink to the channel
avro-memory-logger.sources.avro source.channels = memory-channel
avro-memory-logger.sinks.logger sink.channel = memory-channel

先啟動avro-memory-logger

  flume-ng agent \
    --name avro-memory-logger \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/avro-memory-logger.conf \
    -Dflume.root.logger=INFO,console

再啟動exec-memory-avro

 flume-ng agent \
    --name exec-memory-avro \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
    -Dflume.root.logger=INFO,console

測試:
新開啟一個視窗:輸入以下內容

 [[email protected] data]$ echo hello spark >> data.log 
 [[email protected] data]$ echo hello hadoop >> data.log  

原視窗會出現以下變化:

Event: { headers:{} body: 68 65 6C 6C 6F 20 73 70 61 72 6B                hello spark }
Event: { headers:{} body: 68 65 6C 6C 6F 20 68 61 64 6F 6F 70             hello hadoop }