Hadoop學習筆記之flume Nginx日誌收集到HBase
概述
Nginx訪問日誌形式: $remote_addr – $remote_user [$time_local] “$request”$status $body_bytes_sent“$http_referer” ”$http_user_agent”
例如:192.168.241.1 - - [02/Mar/2017:15:22:57 +0800] “GET /favicon.ico HTTP/1.1” 404 209 “http://192.168.241.10/” “Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36”
考慮到Nginx伺服器可能有多個,所以採用二級Agent來收集日誌
其中Interceptor是自定義攔截器,將利用正則表示式配置過濾掉無用記錄,將文字記錄轉換為map物件。
HBaseSink是自定義HBaseSink,可以將map物件儲存到HBase中,可配置時間欄位作為行鍵字首,可按時間升序排列。
自定義過濾器
package com.mine.flume.interceptors;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class LogFormatInterceptor implements Interceptor{
private static Logger logger = Logger.getLogger(LogFormatInterceptor.class);
private Pattern regex; //正則匹配式
private String[] keys; //匹配內容所對應keys
public LogFormatInterceptor(Pattern regex,String[] keys) {
this.regex = regex;
this.keys = keys;
}
public void initialize() {
}
public Event intercept(Event event) {
byte[] body = event.getBody();
String record = new String(body);
Matcher matcher = regex.matcher(record);
if(matcher.find()) {
int groupCount = matcher.groupCount();
if(groupCount!=keys.length) {
logger.error("regex匹配的group數與keys長度不相等");
event = null;
}else {
//構造Map
Map<String,String> map = Util.matcherToMap(matcher,keys);
//將Map轉換成byte陣列存放到body中
byte[] bytes = Util.mapToByteArray(map);
//替換Event內容
event.setBody(bytes);
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
} catch (IOException e) {
e.printStackTrace();
}
}
}else {
event = null;
}
return event;
}
public List<Event> intercept(List<Event> list) {
List<Event> events = new ArrayList<Event>();
for(Event event: list) {
Event nEvent = intercept(event);
if(nEvent!=null) {//該事件有效
events.add(nEvent);
}
}
return events;
}
public void close() {
}
public static class Builder implements Interceptor.Builder {
private Pattern regex; //正則匹配式
private String[] keys; //匹配內容所對應keys
public Builder() {
}
public Interceptor build() {
return new LogFormatInterceptor(regex,keys);
}
public void configure(Context context) {
String regexString = context.getString("regex",".*");
regex = Pattern.compile(regexString);
String keyString = context.getString("keys");
keys = keyString.split(",");
}
}
}
Interceptor過濾器工作方式:Channel Processor會根據Class.newInstance()方法建立一個Build例項,將配置檔案生成的Context傳遞給Build.configure方法從中獲取Interceptor所需要的配置項,這裡是regex和keys。而後呼叫Build.build()返回一個Interceptor物件。呼叫Interceptor.intercept(List)處理事件列表
flume配置
agent.sources = execSrc
agent.channels = memoryChannel
agent.sinks = avroSink
# 利用exec方式實時追蹤日誌
agent.sources.execSrc.type = exec
agent.sources.execSrc.command = tail -F /etc/nginx/log/access.log
agent.sources.execSrc.channels = memoryChannel
agent.sources.execSrc.interceptors = logformat
agent.sources.execSrc.interceptors.logformat.type = com.mine.flume.interceptors.LogFormatInterceptor$Builder
agent.sources.execSrc.interceptors.logformat.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) - ([^ ]*) \\[(.*)\\] \"(.*)\" ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\"
agent.sources.execSrc.interceptors.logformat.keys = remote_addr,remote_user,time_local,request,status,body_bytes_sent,http_referer,http_user_agent
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname = 192.168.241.10
agent.sinks.avroSink.port = 1111
agent.sinks.avroSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
自定義HBaseSink
public class MapBodyHbaseSink implements HbaseEventSerializer {
private static Logger logger = Logger.getLogger(MapBodyHbaseSink.class);
private String rowPrefix;
private byte[] incrementRow;
private byte[] cf;
private Map<String,String> body;
private byte[] incCol;
private KeyType keyType;
public void configure(Context context) {
rowPrefix = context.getString("rowPrefix", "default");
incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
String suffix = context.getString("suffix", "uuid");
String incColumn = context.getString("incrementColumn", "iCol");
if (suffix.equals("timestamp")) {
keyType = KeyType.TS;
} else if (suffix.equals("random")) {
keyType = KeyType.RANDOM;
} else if (suffix.equals("nano")) {
keyType = KeyType.TSNANO;
} else if(suffix.equals("timePrefix")){
//自定義主鍵型別,以某個時間欄位為字首,rowPrefix指定時間段域名
keyType = KeyType.TIMEPREFIX;
} else {
keyType = KeyType.UUID;
}
if (incColumn != null && !incColumn.isEmpty()) {
incCol = incColumn.getBytes(Charsets.UTF_8);
}
}
public void configure(ComponentConfiguration conf) {
}
public void initialize(Event event, byte[] cf) {
this.body = Util.byteArrayToMap(event.getBody());
this.cf = cf;
}
public List<Row> getActions() throws FlumeException {
List<Row> actions = new LinkedList<Row>();
if (body != null&&!body.isEmpty()) {
byte[] rowKey;
try {
if (keyType == KeyType.TS) {
rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
} else if (keyType == KeyType.RANDOM) {
rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
} else if (keyType == KeyType.TSNANO) {
rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
} else {
if(keyType == KeyType.TIMEPREFIX) {
String dateString = body.get(rowPrefix);
if(dateString==null) return actions;
Date access_time = Util.parseDate(dateString);
if(access_time==null) {//解析訪問時間失敗
logger.error("解析時間失敗:"+body.get(rowPrefix));
return actions;
}else {
body.put(rowPrefix,Util.formatDate(access_time));
rowPrefix = access_time.getTime()+"";
}
}
rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
}
for(String col : body.keySet()) {
Put put = new Put(rowKey);
put.add(cf, col.getBytes(), body.get(col).getBytes());
actions.add(put);
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
public List<Increment> getIncrements() {
List<Increment> incs = new LinkedList<Increment>();
if (incCol != null) {
Increment inc = new Increment(incrementRow);
inc.addColumn(cf, incCol, 1);
incs.add(inc);
}
return incs;
}
public void close() {
}
public enum KeyType {
UUID,
RANDOM,
TS,
TSNANO,
TIMEPREFIX;
}
}
HBaseSink工作方式:從configure方法獲取配置資訊,包括行鍵型別、行鍵字首等,我在這裡新增行鍵型別timePrefix可以配置某個時間欄位作為行鍵字首,因為HBase中按照行鍵升序排列。
flume配置
agent.sources = avroSrc
agent.channels = memoryChannel
agent.sinks = hbaseSink
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 192.168.241.10
agent.sources.avroSrc.port = 1111
agent.sources.avroSrc.channels = memoryChannel
agent.sinks.hbaseSink.type = hbase
agent.sinks.hbaseSink.table = access_logs
agent.sinks.hbaseSink.columnFamily = log
agent.sinks.hbaseSink.serializer = com.mine.flume.sinks.MapBodyHbaseSink
agent.sinks.hbaseSink.serializer.rowPrefix = time_local
agent.sinks.hbaseSink.serializer.suffix = timePrefix
agent.sinks.hbaseSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
將專案打成jar包,放到flume的lib目錄下
配置完成之後,啟動flume
flume-ng agent -c /home/user/hadoop/apache-flume-1.7.0-bin/conf -f /home/user/hadoop/apache-flume-1.7.0-bin/conf/flume-conf.properties -n agent
-c — 配置檔案目錄
-f — 配置檔案位置
-n — agent名字(即配置檔案中agent.sources其中agent就是名字)
問題
1.工作時,NoDefClass異常:ProtobufUtil
這個是由於打包時,直接用idea匯出jar包,其中會包含依賴jar包中的class檔案,所以導致與flume/lib中jar包重複衝突導致異常,利用mvn install打包檔案之後,將倉庫下jar檔案複製到lib目錄下就可以避免。