1. 程式人生 > >flume自定義interceptor和hbase sink

flume自定義interceptor和hbase sink

      在flume的實際應用中,可能會遇到對日誌進行簡單的過濾和處理。flume在source端有其內建的interceptor類可以對主機、IP、靜態標記做處理,如果想自定義處理邏輯該如何處理?在不規則的日誌資料進入hbase之前想做處理又該如何處理?

1.自定義source

在eclipse(或Myeclipse)中,引入flume的jar包(下載flume解壓後的lib目錄中),編寫自定義類,實現Interceptor類,重寫public Event intercept(Event event) 和

<pre name="code" class="java"><span style="font-family:Microsoft YaHei;">public List<Event> intercept(List<Event> events),其中在第一個方法裡編寫自己的處理邏輯,日誌資料是位元組陣列形式存在body裡的,要處理日誌資料,需要先將其轉化</span>
<span style="font-family:Microsoft YaHei;">為字串。後一個方法是批量處理event,其實就是呼叫前一個方法,最後一段程式碼是例項化你編寫的類。完整原始碼如下:</span>
<span style="font-family:Microsoft YaHei;">import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;

public class AccessLogInterceptor implements Interceptor {

	@Override
	public void close() {
		// TODO Auto-generated method stub

	}

	@Override
	public void initialize() {
		// TODO Auto-generated method stub

	}

	@Override
	public Event intercept(Event event) {
		// TODO Auto-generated method stub
		StringBuffer sb = new StringBuffer();
		String body = new String(event.getBody(), Charsets.UTF_8);
		String[] fields = body.split("|");
		int i = 1;
		for (String field : fields) {
			sb.append(i + field);
			i++;
		}
		event.setBody(sb.toString().getBytes());
		return event;
	}

	@Override
	public List<Event> intercept(List<Event> events) {
		// TODO Auto-generated method stub
		List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
		for (Event event : events) {
			Event interceptedEvent = intercept(event);
			if (interceptedEvent != null) {
				intercepted.add(interceptedEvent);
			}
		}
		return intercepted;
	}

	public static class Builder implements Interceptor.Builder {
		// 使用Builder初始化Interceptor
		@Override
		public Interceptor build() {
			return new AccessLogInterceptor();
		}

		@Override
		public void configure(Context context) {

		}
	}

}</span>
編寫完成後,將程式碼打包如下圖所示:


上傳到flume的lib目錄下:


修改配置檔案client.properties.properties(我的flume配置檔名,參照自己的配置檔案修改):


啟動你的flume就OK了。

2.自定義hbase sink

要對進如hbase的日誌做預處理,以便於使用和檢視。在上述的工程中新建立一個包,在其中編寫處理類。原始碼如下:

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;

public class AsyncHbaseLogEventSerializer implements AsyncHbaseEventSerializer {
	private byte[] table;
	private byte[] colFam;
	private byte[][] columnNames;
	private Event currentEvent;
	private final List<PutRequest> puts = new ArrayList<PutRequest>();
	private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
	private byte[] currentRowKey;
	private final byte[] eventCountCol = "eventCount".getBytes();
	private static Logger log = Logger.getLogger(AsyncHbaseLogEventSerializer.class);

	//初始化工作
	@Override
	public void initialize(byte[] table, byte[] cf) {
		// TODO Auto-generated method stub
		this.table = table;
		this.colFam = cf;
	}

	//讀取flume配置檔案內容,包括列名,rowkey字尾等資訊
	@Override
	public void configure(Context context) {
		// TODO Auto-generated method stub
		String cols = new String(context.getString("columns"));
		String[] names = cols.split(",");
		columnNames = new byte[names.length][];
		int i = 0;
		for (String name : names) {
			log.info("列名是:"+name);
			columnNames[i++] = name.getBytes();
		}
	}

	@Override
	public List<PutRequest> getActions() {
		// TODO Auto-generated method stub
		// Split the event body and get the values for the columns
		String eventStr = new String(currentEvent.getBody());
		String[] cols = logTokenize(eventStr);
		puts.clear();
		String req = cols[4];
		String reqPath = req.split(" ")[1];
		int pos = reqPath.indexOf("?");
		if (pos > 0) {
			reqPath = reqPath.substring(0, pos);
		}
		if (reqPath.length() > 1 && reqPath.trim().endsWith("/")) {
			reqPath = reqPath.substring(0, reqPath.length() - 1);
		}
		String req_ts_str = cols[3];
		Long currTime = System.currentTimeMillis();
		String currTimeStr = null;
		if (req_ts_str != null && !req_ts_str.equals("")) {
			SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",
					Locale.US);
			SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			try {
				currTimeStr = df2.format(df.parse(req_ts_str));
				currTime = df.parse(req_ts_str).getTime();
			} catch (ParseException e) {
				System.out
						.println("parse req time error,using system.current time.");
			}
		}

		long revTs = Long.MAX_VALUE - currTime;
		currentRowKey = (Long.toString(revTs) + reqPath).getBytes();
		System.out.println("currentRowKey: " + new String(currentRowKey));
		for (int i = 0; i < cols.length; i++) {
			PutRequest putReq = new PutRequest(table, currentRowKey, colFam,
					columnNames[i], cols[i].getBytes());
			puts.add(putReq);
		}

		// 增加列
		PutRequest reqPathPutReq = new PutRequest(table, currentRowKey, colFam,
				"req_path".getBytes(), reqPath.getBytes());
		puts.add(reqPathPutReq);
		PutRequest reqTsPutReq = new PutRequest(table, currentRowKey, colFam,
				"req_ts".getBytes(), Bytes.toBytes(currTimeStr));
		puts.add(reqTsPutReq);
		//String channelType = ChannelUtil.getType(cols[8]);
		String channelType = "abc";
		PutRequest channelPutReq = new PutRequest(table, currentRowKey, colFam,
				"req_chan".getBytes(), Bytes.toBytes(channelType));
		puts.add(channelPutReq);
		return puts;
	}

	@Override
	public List<AtomicIncrementRequest> getIncrements() {
		// TODO Auto-generated method stub
		incs.clear();
		incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(),
				colFam, eventCountCol));
		return incs;
	}

	@Override
	public void setEvent(Event event) {
		// TODO Auto-generated method stub
		this.currentEvent = event;

	}

	@Override
	public void configure(ComponentConfiguration arg0) {
		// TODO Auto-generated method stub

	}

	@Override
	public void cleanUp() {
		// TODO Auto-generated method stub
		table = null;
		colFam = null;
		currentEvent = null;
		columnNames = null;
		currentRowKey = null;
	}

	public String[] logTokenize(String eventStr) {
		String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\"";
		Pattern p = Pattern.compile(logEntryPattern);
		Matcher matcher = p.matcher(eventStr);
		if (!matcher.matches()) {
			System.err.println("Bad log entry (or problem with RE?):");
			System.err.println(eventStr);
			return null;
		}
		String[] columns = new String[matcher.groupCount()];
		for (int i = 0; i < matcher.groupCount(); i++)
		{
			columns[i] = matcher.group(i + 1);
		}
		return columns;
	}
}
打包放入到lib目錄下(同上),配置檔案如下:


重啟flume即可。