1. 程式人生 > >Hadoop學習筆記之flume Nginx日誌收集到HBase

Hadoop學習筆記之flume Nginx日誌收集到HBase

概述

Nginx訪問日誌形式: $remote_addr – $remote_user [$time_local] “$request”$status $body_bytes_sent“$http_referer” ”$http_user_agent”

例如:192.168.241.1 - - [02/Mar/2017:15:22:57 +0800] “GET /favicon.ico HTTP/1.1” 404 209 “http://192.168.241.10/” “Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36”

考慮到Nginx伺服器可能有多個,所以採用二級Agent來收集日誌
這裡寫圖片描述

其中Interceptor是自定義攔截器,將利用正則表示式配置過濾掉無用記錄,將文字記錄轉換為map物件。
HBaseSink是自定義HBaseSink,可以將map物件儲存到HBase中,可配置時間欄位作為行鍵字首,可按時間升序排列。

自定義過濾器

package com.mine.flume.interceptors;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flume.Context;
import
org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.log4j.Logger; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; public
class LogFormatInterceptor implements Interceptor{ private static Logger logger = Logger.getLogger(LogFormatInterceptor.class); private Pattern regex; //正則匹配式 private String[] keys; //匹配內容所對應keys public LogFormatInterceptor(Pattern regex,String[] keys) { this.regex = regex; this.keys = keys; } public void initialize() { } public Event intercept(Event event) { byte[] body = event.getBody(); String record = new String(body); Matcher matcher = regex.matcher(record); if(matcher.find()) { int groupCount = matcher.groupCount(); if(groupCount!=keys.length) { logger.error("regex匹配的group數與keys長度不相等"); event = null; }else { //構造Map Map<String,String> map = Util.matcherToMap(matcher,keys); //將Map轉換成byte陣列存放到body中 byte[] bytes = Util.mapToByteArray(map); //替換Event內容 event.setBody(bytes); try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); } catch (IOException e) { e.printStackTrace(); } } }else { event = null; } return event; } public List<Event> intercept(List<Event> list) { List<Event> events = new ArrayList<Event>(); for(Event event: list) { Event nEvent = intercept(event); if(nEvent!=null) {//該事件有效 events.add(nEvent); } } return events; } public void close() { } public static class Builder implements Interceptor.Builder { private Pattern regex; //正則匹配式 private String[] keys; //匹配內容所對應keys public Builder() { } public Interceptor build() { return new LogFormatInterceptor(regex,keys); } public void configure(Context context) { String regexString = context.getString("regex",".*"); regex = Pattern.compile(regexString); String keyString = context.getString("keys"); keys = keyString.split(","); } } }

Interceptor過濾器工作方式:Channel Processor會根據Class.newInstance()方法建立一個Build例項,將配置檔案生成的Context傳遞給Build.configure方法從中獲取Interceptor所需要的配置項,這裡是regex和keys。而後呼叫Build.build()返回一個Interceptor物件。呼叫Interceptor.intercept(List)處理事件列表

flume配置

agent.sources = execSrc
agent.channels = memoryChannel
agent.sinks = avroSink

# 利用exec方式實時追蹤日誌
agent.sources.execSrc.type = exec
agent.sources.execSrc.command = tail -F /etc/nginx/log/access.log
agent.sources.execSrc.channels = memoryChannel

agent.sources.execSrc.interceptors = logformat
agent.sources.execSrc.interceptors.logformat.type = com.mine.flume.interceptors.LogFormatInterceptor$Builder
agent.sources.execSrc.interceptors.logformat.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) - ([^ ]*) \\[(.*)\\] \"(.*)\" ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\"
agent.sources.execSrc.interceptors.logformat.keys = remote_addr,remote_user,time_local,request,status,body_bytes_sent,http_referer,http_user_agent


agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname = 192.168.241.10
agent.sinks.avroSink.port = 1111


agent.sinks.avroSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory

自定義HBaseSink

public class MapBodyHbaseSink implements HbaseEventSerializer {
    private static Logger logger = Logger.getLogger(MapBodyHbaseSink.class);

    private String rowPrefix;
    private byte[] incrementRow;
    private byte[] cf;
    private Map<String,String> body;
    private byte[] incCol;
    private KeyType keyType;


    public void configure(Context context) {
        rowPrefix = context.getString("rowPrefix", "default");
        incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
        String suffix = context.getString("suffix", "uuid");
        String incColumn = context.getString("incrementColumn", "iCol");
        if (suffix.equals("timestamp")) {
            keyType = KeyType.TS;
        } else if (suffix.equals("random")) {
            keyType = KeyType.RANDOM;
        } else if (suffix.equals("nano")) {
            keyType = KeyType.TSNANO;
        } else if(suffix.equals("timePrefix")){
            //自定義主鍵型別,以某個時間欄位為字首,rowPrefix指定時間段域名
            keyType = KeyType.TIMEPREFIX;
        } else {
            keyType = KeyType.UUID;
        }
        if (incColumn != null && !incColumn.isEmpty()) {
            incCol = incColumn.getBytes(Charsets.UTF_8);
        }
    }

    public void configure(ComponentConfiguration conf) {
    }

    public void initialize(Event event, byte[] cf) {
        this.body = Util.byteArrayToMap(event.getBody());
        this.cf = cf;
    }

    public List<Row> getActions() throws FlumeException {
        List<Row> actions = new LinkedList<Row>();
        if (body != null&&!body.isEmpty()) {
            byte[] rowKey;
            try {
                if (keyType == KeyType.TS) {
                    rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
                } else if (keyType == KeyType.RANDOM) {
                    rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
                } else if (keyType == KeyType.TSNANO) {
                    rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
                } else {
                    if(keyType == KeyType.TIMEPREFIX) {
                        String dateString = body.get(rowPrefix);
                        if(dateString==null) return actions;
                        Date access_time = Util.parseDate(dateString);
                        if(access_time==null) {//解析訪問時間失敗
                            logger.error("解析時間失敗:"+body.get(rowPrefix));
                            return actions;
                        }else {
                            body.put(rowPrefix,Util.formatDate(access_time));
                            rowPrefix = access_time.getTime()+"";
                        }
                    }
                    rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
                }
                for(String col : body.keySet()) {
                    Put put = new Put(rowKey);
                    put.add(cf, col.getBytes(), body.get(col).getBytes());
                    actions.add(put);
                }
            } catch (Exception e) {
                throw new FlumeException("Could not get row key!", e);
            }

        }
        return actions;
    }

    public List<Increment> getIncrements() {
        List<Increment> incs = new LinkedList<Increment>();
        if (incCol != null) {
            Increment inc = new Increment(incrementRow);
            inc.addColumn(cf, incCol, 1);
            incs.add(inc);
        }
        return incs;
    }

    public void close() {
    }

    public enum KeyType {
        UUID,
        RANDOM,
        TS,
        TSNANO,
        TIMEPREFIX;
    }


}

HBaseSink工作方式:從configure方法獲取配置資訊,包括行鍵型別、行鍵字首等,我在這裡新增行鍵型別timePrefix可以配置某個時間欄位作為行鍵字首,因為HBase中按照行鍵升序排列。

flume配置

agent.sources = avroSrc
agent.channels = memoryChannel
agent.sinks = hbaseSink

agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 192.168.241.10
agent.sources.avroSrc.port = 1111

agent.sources.avroSrc.channels = memoryChannel


agent.sinks.hbaseSink.type = hbase
agent.sinks.hbaseSink.table = access_logs
agent.sinks.hbaseSink.columnFamily = log
agent.sinks.hbaseSink.serializer = com.mine.flume.sinks.MapBodyHbaseSink
agent.sinks.hbaseSink.serializer.rowPrefix = time_local
agent.sinks.hbaseSink.serializer.suffix = timePrefix
agent.sinks.hbaseSink.channel = memoryChannel

agent.channels.memoryChannel.type = memory

將專案打成jar包,放到flume的lib目錄下

配置完成之後,啟動flume

flume-ng agent -c /home/user/hadoop/apache-flume-1.7.0-bin/conf -f /home/user/hadoop/apache-flume-1.7.0-bin/conf/flume-conf.properties -n agent

-c — 配置檔案目錄
-f — 配置檔案位置
-n — agent名字(即配置檔案中agent.sources其中agent就是名字)

問題

1.工作時,NoDefClass異常:ProtobufUtil

這個是由於打包時,直接用idea匯出jar包,其中會包含依賴jar包中的class檔案,所以導致與flume/lib中jar包重複衝突導致異常,利用mvn install打包檔案之後,將倉庫下jar檔案複製到lib目錄下就可以避免。