1. 程式人生 > >Flume+HBase採集和儲存日誌資料

Flume+HBase採集和儲存日誌資料

轉自:http://blog.csdn.net/yaoyasong/article/details/39400829

前言

大資料時代,誰掌握了足夠的資料,誰就有可能掌握未來,而其中的資料採集就是將來的流動資產積累。

幾乎任何規模企業,每時每刻也都在產生大量的資料,但這些資料如何歸集、提煉始終是一個困擾。而大資料技術的意義確實不在於掌握規模龐大的資料資訊,而在於對這些資料進行智慧處理,從中分析和挖掘出有價值的資訊,但前提是如何獲取大量有價值的資料。

在最近的工作當中,本人剛好實現了運用大資料技術分析網站訪問日誌的方案,整個方案包括對網站日誌的採集、清洗、儲存和統計分析,計劃通過幾篇文章將技術實現細節分享出來,以期引起更多的思考和討論。

網站訪問日誌介紹

相信很多做過網站管理的人對網站訪問日誌(Access Log)應該不會陌生,現在主流的網站伺服器(如apache,tomcat,ngxin等)都支援將日誌資料記錄到伺服器的日誌檔案中。

網站的訪問日誌中記錄了很多有用的資訊,比如正常使用者的訪問足跡、惡意搗亂的足跡、使用者的入站方式、出站頁面等等資訊。對以上資訊彙總分類後,可以得到更有價值的東西,比如可以得到搜尋引擎的抓取頻率和來訪時間段、可以得到哪些頁面是使用者熱搜的等等。

首先看一個訪問日誌的例子:

10.52.10.49 - - [17/Sep/2014:11:34:21 +0800] "GET /webapp HTTP/1.1" 302 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36"

這是一個combined格式的訪問日誌,裡面記錄了使用者的訪問ip、時間、訪問地址、來源地址等。如要了解具體的格式說明,請檢視相關資料。

日誌採集儲存方案

對於一個比較活躍的網站來說,訪問日誌將會是一個海量的資料,考慮到網站日誌更新頻繁、和海量資料的特點,我選擇了Flume + HBase的採集和儲存方案。

Flume

Flume最早是Cloudera提供的日誌收集系統,目前是Apache下的一個專案,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料。

Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力 Flume提供了從console(控制檯)、RPCThrift-RPC

)、text(檔案)、tailUNIX tail)、syslogsyslog日誌系統,支援TCPUDP2種模式),exec(命令執行)等資料來源上收集資料的能力。


HBase

HBase – Hadoop Database,是一個高可靠性、高效能、面向列、可伸縮的分散式儲存系統,利用HBase技術可在廉價PC Server上搭建起大規模結構儲存叢集。


本次方案以TomcatWeb伺服器,通過Flume實時監控網站的日誌檔案並將新增日誌收集、清洗並儲存到HBase中,供Spark等分佈計算框架分析使用等。

 

方案實現

前提條件:

已經在linux伺服器上安裝並啟動了相關的程式:Tomcat7,Hadoop2.4.1Zookeeper3.4.6HBase0.98.5Flume1.5。具體安裝步驟請自行檢視相關文件。

1.首先開啟Tomcat中的日誌記錄功能,並選擇combined格式。

修改TOMCAT_PATH/conf/server.xml,增加日誌記錄:

<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"

prefix="localhost_access_log." suffix=".txt" renameOnRotate="true"

pattern="combined" />

這樣,tomcat就會在logs目錄下每天生成localhost_access_log檔案並實時記錄使用者的訪問情況。

2.實現日誌檔案物件和解析程式

AccessLog.java:

public class AccessLog {

private String clientIp;

private String clientIndentity;

private String remoteUser;

private Date dateTime;

private String request;

private String httpStatusCode;

private String bytesSent;

private String referer;

private String userAgent;

}

AccessLogParser.java:

public class AccessLogParser {

private static String pattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\"";

private static Pattern p = Pattern.compile(pattern);

public static AccessLog parse(String line){

Matcher matcher = p.matcher(line);

if (matcher.matches()){

AccessLog accessLog = new AccessLog();

accessLog.setClientIp(matcher.group(1));

accessLog.setClientIndentity(matcher.group(2));

accessLog.setRemoteUser(matcher.group(3));

accessLog.setDateTime(getDateTime(matcher.group(4)));

accessLog.setRequest(matcher.group(5));

accessLog.setHttpStatusCode(matcher.group(6));

accessLog.setBytesSent(matcher.group(7));

accessLog.setReferer(matcher.group(8));

accessLog.setUserAgent(matcher.group(9));

return accessLog;

}

logger.warn("This line is not a valid combined log, ignored it. -- " + line);

return null;

}

3.通過HBase ShellHBase中建立相應的表access_log

執行:$HBASE_HOME/bin/hbase shell,進入shell命令列

create 'access_log','cb',建立access_log,和一個列族cb。因為hbase是一個列伺服器,一個列族中可以增加很多列,為了效能考慮,一般不要建立多於三個列族。

出現如下提示資訊,即建立成功

0 row(s) in 11.9690 seconds

=> Hbase::Table - access_log

可以通過list命令檢視資料庫中的表,或scan ‘access_log’,查看錶中資料

4.配置Flume,實現採集和儲存

在本方案中,我們要將資料儲存到HBase中,所以使用flume中提供的hbase sink,同時,為了清洗轉換日誌資料,我們實現自己的AsyncHbaseEventSerializer

public class AsyncHbaseLogEventSerializer implements AsyncHbaseEventSerializer{

private byte[] table;

private byte[] colFam;

private Event currentEvent;

private byte[][] columnNames;

private final List<PutRequest> puts = new ArrayList<PutRequest>();

private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();

private byte[] currentRowKey;

private final byte[] eventCountCol = "eventCount".getBytes();

public void initialize(byte[] table, byte[] cf) {

this.table = table;

this.colFam = cf;

}

public void configure(Context context) {

String cols = new String(context.getString("columns"));

String[] names = cols.split(",");

columnNames = new byte[names.length][];

int i = 0;

for (String name : names) {

columnNames[i++] = name.getBytes();

}

}

public void configure(ComponentConfiguration conf) {

}

public List<PutRequest> getActions() {

// Split the event body and get the values for the columns

String eventStr = new String(currentEvent.getBody());

String[] cols = logTokenize(eventStr);

puts.clear();

String req = cols[4];

String reqPath = req.split(" ")[1];

int pos = reqPath.indexOf("?");

if (pos > 0) {

reqPath = reqPath.substring(0,pos);

}

if(reqPath.length() > 1 && reqPath.trim().endsWith("/")){

reqPath = reqPath.substring(0,reqPath.length()-1);

}

String req_ts_str = cols[3];

Long currTime = System.currentTimeMillis();

String currTimeStr = null;

if (req_ts_str != null && !req_ts_str.equals("")){

SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);

SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

try {

currTimeStr = df2.format(df.parse(req_ts_str));

currTime = df.parse(req_ts_str).getTime();

} catch (ParseException e) {

System.out.println("parse req time error,using system.current time.");

}

}

long revTs = Long.MAX_VALUE - currTime;

currentRowKey = (Long.toString(revTs) + reqPath).getBytes();

System.out.println("currentRowKey: " + new String(currentRowKey));

for (int i = 0; i < cols.length; i++){

PutRequest putReq = new PutRequest(table, currentRowKey, colFam, columnNames[i], cols[i].getBytes());

puts.add(putReq);

}

//增加列

PutRequest reqPathPutReq = new PutRequest(table, currentRowKey, colFam, "req_path".getBytes(), reqPath.getBytes());

puts.add(reqPathPutReq);

PutRequest reqTsPutReq = new PutRequest(table, currentRowKey, colFam, "req_ts".getBytes(), Bytes.toBytes(currTimeStr));

puts.add(reqTsPutReq);

String channelType = ChannelUtil.getType(cols[8]);

PutRequest channelPutReq = new PutRequest(table, currentRowKey, colFam, "req_chan".getBytes(), Bytes.toBytes(channelType));

puts.add(channelPutReq);

return puts;

}

public String[] logTokenize(String eventStr) {

String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\"";

Pattern p = Pattern.compile(logEntryPattern);

Matcher matcher = p.matcher(eventStr);

if (!matcher.matches())

{

System.err.println("Bad log entry (or problem with RE?):");

System.err.println(eventStr);

return null;

}

String[] columns = new String[matcher.groupCount()];

for (int i = 0; i < matcher.groupCount(); i++)

{

columns[i] = matcher.group(i+1);

}

return columns;

}

public List<AtomicIncrementRequest> getIncrements() {

incs.clear();

incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));

return incs;

}

public void setEvent(Event event) {

this.currentEvent = event;

}

public void cleanUp() {

table = null;

colFam = null;

currentEvent = null;

columnNames = null;

currentRowKey = null;

}

Flume Agent配置flume-src-agent.conf

# http://flume.apache.org/FlumeUserGuide.html#exec-source

source_agent.sources = apache_server

source_agent.sources.apache_server.type = exec

source_agent.sources.apache_server.command = tail -F /opt/muse_tomcat/logs/localhost_access_log..txt

source_agent.sources.apache_server.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel

source_agent.channels = memoryChannel

source_agent.channels.memoryChannel.type = memory

source_agent.channels.memoryChannel.capacity = 1000

source_agent.channels.memoryChannel.transactionCapacity = 100

## Send to Flume Collector on Hadoop Node

# http://flume.apache.org/FlumeUserGuide.html#avro-sink

source_agent.sinks = avro_sink

source_agent.sinks.avro_sink.type = avro

source_agent.sinks.avro_sink.hostname = 10.51.108.38

source_agent.sinks.avro_sink.port = 4545

source_agent.sinks.avro_sink.channel = memoryChannel

Flume HBase sink配置flume-hbase.conf

#http://flume.apache.org/FlumeUserGuide.html#avro-source

collector.sources = AvroIn

collector.sources.AvroIn.type = avro

collector.sources.AvroIn.bind = 10.51.108.38

collector.sources.AvroIn.port = 4545

collector.sources.AvroIn.channels = mc1

## Channels ##

## Source writes to 3 channels, one for each sink

collector.channels = mc1

#http://flume.apache.org/FlumeUserGuide.html#memory-channel

collector.channels.mc1.type = memory

collector.channels.mc1.capacity = 1000

## Sinks #

collector.sinks = HbaseOut

###############################################################

# HBase sink config

###############################################################

collector.sinks.HbaseOut.type = asynchbase

collector.sinks.HbaseOut.channel = mc1

collector.sinks.HbaseOut.table = access_log

collector.sinks.HbaseOut.columnFamily = cb

collector.sinks.HbaseOut.batchSize = 5

collector.sinks.HbaseOut.serializer = com.ygsoft.muse.data.util.AsyncHbaseLogEventSerializer

collector.sinks.HbaseOut.serializer.columns = host_name,remote_host,remote_user,event_ts,req,req_status,resp_bytes,ref,agent

5.執行Flume AgentHBase Sink

後臺方式執行Flume Agent

nohup $FLUME_HOME/bin/flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /conf/flume-src-agent.conf -n source_agent &

後臺方式執行HBase Sink

nohup $FLUME_HOME/bin/flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME//conf/flume-hbase.conf -n collector &

服務啟動後,網站日誌就會持續寫入到hbase資料庫中了。可以通過hbase shell檢視:

hbase(main):015:0> scan 'access_log'

ROW COLUMN+CELL

9223370625743240807/webappcolumn=cb:agent, timestamp=1411111540520, value=Jakarta Commons-HttpClient/3.1

9223370625743240807/webappcolumn=cb:event_ts, timestamp=1411111540519, value=19/Sep/2014:15:25:35 +0800

9223370625743240807/webappcolumn=cb:host_name, timestamp=1411111540519, value=10.52.10.49

9223370625743240807/webappcolumn=cb:ref, timestamp=1411111540520, value=-

...

1 row(s) in 0.3470 seconds

出現類似這樣的資訊就證明資料已經存放到hbase中了。 

技術點

HBaseRowKey的設計

HBase的查詢實現只提供兩種方式:

1、按指定RowKey獲取唯一一條記錄,get方法

2、按指定的條件獲取一批記錄,scan方法

通過巧妙的RowKey設計使我們批量獲取記錄集合中的元素挨在一起(應該在同一個Region下),可以在遍歷結果時獲得很好的效能。

考慮到訪問日誌的特點,時間性比較強,我們在設計RowKey時採用了(Long.MaxValue() – requestTime) + requestPath組成rowKey

requestTime放入rowkey中,可以提高按時間scan的效率,查詢某段時間的記錄時只要設定scan.setStartRow(beginTime)scan.setStopRow(endTime),方便並且高效。

通過Long.MaxValue() – requestTime可以將最新的日誌放在最前面。

後續

大資料技術應用() HBase中日誌資料訪問方式

大資料技術應用()應用Hadoop MapReduceSpark做日誌分析

大資料技術應用()應用Spark Stream做實時日誌分析

相關資料