1. 程式人生 > >Flume自定義過濾器

Flume自定義過濾器

package com.hnb.data.immi.flume.interceptor;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class LogInterceptor implements Interceptor
{ private boolean preserveExisting; private String logType; private LogInterceptor() { } private LogInterceptor(boolean preserveExisting,String logType) { this.preserveExisting = preserveExisting; this.logType = logType; } public void initialize() { }
public Event intercept(Event event) { byte[] body = event.getBody(); String bodyStr = new String(body); String[] s = bodyStr.split("\\|"); Date date = DateFormatUtil.strToDate(s[0].substring(1)); String logType= s[1]; String beanStr = ""; // if(appType.equals(type)){
JSONObject jsonObject = JSON.parseObject(s[2]); String referrer = jsonObject.get("referrer")==null?"":(String) jsonObject.get("referrer"); String mark = jsonObject.get("mark")==null?"":(String) jsonObject.get("mark"); boolean flag = referrer.matches("^https?:.+:\\d.+") || mark.matches("^https?:.+:\\d.+"); if(!flag){ jsonObject.put("serverTime", date); beanStr = jsonObject.toJSONString(); } // } event.setBody(beanStr.getBytes(Charsets.UTF_8)); Map<String, String> headers = event.getHeaders(); headers.put(LogInterceptor.Constants.LOGTYPE, logType); return event; } public List<Event> intercept(List<Event> events) { Iterator i$ = events.iterator(); while (i$.hasNext()) { Event event = (Event) i$.next(); this.intercept(event); } return events; } public void close() { } public static class Constants { public static String TIMESTAMP = "timestamp"; public static String PRESERVE = "preserveExisting"; public static String LOGTYPE = "type"; public static boolean PRESERVE_DFLT = false; public Constants() { } } public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder { private boolean preserveExisting; private String logType; public Builder() { this.preserveExisting = LogInterceptor.Constants.PRESERVE_DFLT; } public Interceptor build() { return new LogInterceptor(this.preserveExisting,this.logType); } public void configure(Context context) { this.preserveExisting = context.getBoolean(LogInterceptor.Constants.PRESERVE, LogInterceptor.Constants.PRESERVE_DFLT); this.logType = context.getString("logType"); } } }