1. 程式人生 > >基於Apache Flume Datahub外掛將日誌資料同步上雲

基於Apache Flume Datahub外掛將日誌資料同步上雲

本文用到的

簡介

Apache Flume是一個分散式的、可靠的、可用的系統,可用於從不同的資料來源中高效地收集、聚合和移動海量日誌資料到集中式資料儲存系統,支援多種Source和Sink外掛。本文將介紹如何使用Apache Flume的Datahub Sink外掛將日誌資料實時上傳到Datahub。

環境要求

  • JDK (1.7及以上,推薦1.7)
  • Flume-NG 1.x
  • Apache Maven 3.x

外掛部署

下載外掛壓縮包

$ wget http://repo.aliyun.com/download/flume-datahub-sink-1.1.0.tar.gz

解壓外掛壓縮包

$ tar zxvf flume-datahub-sink-1.1
.0.tar.gz $ ls flume-datahub-sink lib libext

部署Datahub Sink外掛

將解壓後的外掛資料夾flume-datahub-sink移動到Apache Flume安裝目錄下

$ mkdir {YOUR_FLUME_DIRECTORY}/plugins.d
$ mv flume-datahub-sink {YOUR_FLUME_DIRECTORY}/plugins.d/

移動後,核驗Datahub Sink外掛是否已經在相應目錄:

$ ls { YOUR_APACHE_FLUME_DIR }/plugins.d
flume-datahub-sink

配置示例

Flume的原理、架構,以及核心元件的介紹請參考 Flume-ng的原理和使用。本文將構建一個使用Datahub Sink的Flume例項,對日誌檔案中的結構化資料進行解析,並上傳到Datahub Topic中。

需要上傳的日誌檔案格式如下(每行為一條記錄,欄位之間逗號分隔):

# test_basic.log
some,log,line1
some,log,line2
...

下面將建立Datahub Topic,並把每行日誌的第一列和第二列作為一條記錄寫入Topic中。

建立Datahub Topic

使用Datahub WebConsole建立好Topic,schema為(string c1, string c2),下面假設建好的Topic名為test_topic。

Flume配置檔案

在Flume安裝目錄的conf/資料夾下建立名為datahub_basic.conf的檔案,並輸入內容如下:

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_END_POINT}
a1.sinks.k1.datahub.project = test_project
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = c1,c2,
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

這裡serializer配置指定了以逗號分隔的形式將輸入源解析成三個欄位,並忽略第三個欄位。

啟動Flume

配置完成後,啟動Flume並指定agent的名稱和配置檔案路徑,新增**-Dflume.root.logger=INFO,console**選項可以將日誌實時輸出到控制檯。

$ cd {YOUR_FLUME_DIRECTORY}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

寫入成功,顯示日誌如下:

...
Write success. Event count: 2
...

資料使用

日誌資料通過Flume上傳到Datahub後,可以使用StreamCompute流計算來進行實時分析,例如對於一些Web網站的日誌,可以實時統計各個頁面的PV/UV等。另外,匯入Datahub的資料也可以配置Connector將資料歸檔至MaxCompute中,方便後續的離線分析。

對於資料歸檔MaxCompute的場景,一般來說需要將資料進行分割槽。Datahub到MaxCompute的歸檔可以根據MaxCompute表的分割槽欄位自動建立分割槽,前提是要求MaxCompute和Datahub的欄位名以及型別可以完全對應上。如果需要根據日誌的傳輸時間自動設定分割槽,則在上面的例子中需要指定MaxCompute的分割槽相應欄位和時間格式,例如按小時自動建立分割槽,新增的配置如下:

a1.sinks.k1.maxcompute.partition.columns = pt
a1.sinks.k1.maxcompute.partition.values = %Y%m%d%H

注意:pt這個欄位需要在Datahub Topic以及MaxCompute表中都存在,且是表的分割槽欄位。

原文連結:

http://click.aliyun.com/m/13944/