1. 程式人生 > >Flume從Kafka讀取資料,並寫入到Hdfs上

Flume從Kafka讀取資料,並寫入到Hdfs上

需求:kafka有五個主題  

topic topic-app-startup

topic topic-app-error

topic topic-app-event

topic topic-app-usage

topic topic-app-page

flume讀取Kafka 5個主題資料,並將資料寫入到hdfs上,按照主題、年月日建立資料夾以及檔案

如下,每天新建五個資料夾,並將主題上的資料寫入到對應的資料夾下

/user/centos/applogs/pager/2017/12/12/xxx-xxxxxxx

/user/centos/applogs/startup/2017/12/12/xxx-xxxxxxx
/user/centos/applogs/startup/2017/12/12/xxx-xxxxxxx
/user/centos/applogs/error/2017/12/12/xxx-xxxxxxx

/user/centos/applogs/error/2017/12/1/xxx-xxxxxxx

flume配置:

    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.wang.bigdata.app.LogCollInterceptor$Builder
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = datatwo:9092,datathree:9020,datafour:9020 a1.sources.r1.kafka.zookeeperConnect = datasix:2181,datasenven:2181,dataeight:2181 a1.sources.r1.kafka.topics.regex = ^topic_app_.*$      a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000
a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /home/wangjk/applogs/%{logType}/%Y%m/%d/%H%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.roundValue = 30 a1.sinks.k1.hdfs.roundUnit = second a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1

需要自定義 intercepter:

package com.wang.bigdata.app;
import com.alibaba.fastjson.JSONObject;
import com.wang.bigdata.app.common.*;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
/**
 * 自定義flume的攔截器,提取bod中的時間作為header
 */
public class LogCollInterceptor implements Interceptor {

    private final boolean preserveExisting;
    private LogCollInterceptor(boolean preserveExisting) {
        this.preserveExisting = preserveExisting;
}

    public void initialize() {

    }

    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        byte[] body = event.getBody();
String jsonStr = new String(body);
AppBaseLog json = JSONObject.parseObject(jsonStr, AppBaseLog.class);
Long time = json.getCreatedAtMs();
headers.put(FlumeConstants.TIMESTAMP,Long.toString(time));
//處理log型別的頭
String logType = "" ;
        if(jsonStr.contains("pageId")){
            logType = "page" ;
}
        //eventLog
else if (jsonStr.contains("eventId")) {
            logType = "event";
}
        //usageLog
else if (jsonStr.contains("singleUseDurationSecs")) {
            logType = "usage";
}
        //error
else if (jsonStr.contains("errorBrief")) {
            logType = "error";
}
        //startup
else if (jsonStr.contains("network")) {
            logType = "startup";
}
        headers.put("logType", logType);
//save(logType);
return event;
}



    public static class Builder implements Interceptor.Builder {

        private boolean preserveExisting = FlumeConstants.PRESERVE_DFLT;
        public Interceptor build() {
            return new LogCollInterceptor(preserveExisting);
}

        public void configure(Context context) {
            preserveExisting = context.getBoolean(FlumeConstants.PRESERVE, FlumeConstants.PRESERVE_DFLT);
}

    }



    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
}
        return events;
}

    public void close() {

    }

}
public class FlumeConstants {
    public static String TIMESTAMP = "timestamp";
    public static String PRESERVE = "preserveExisting";
    public static boolean PRESERVE_DFLT = false;
}

將類打成jar包 放入flume/lib包下,注意:還需要將jar包依賴的jar包 一同放入到flume/lib下