1. 程式人生 > >記錄-----自定義interceptor,解決flume監控日誌上傳HDFS日期後移一天問題

記錄-----自定義interceptor,解決flume監控日誌上傳HDFS日期後移一天問題

1.自定義flume的interceptor新增自定義header

package com.huajie.flume.interceptor.custominterceptor;


import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;

 
/**
 * 
 * @author Administrator
 *
 */
public class CustomDateInterceptor implements Interceptor {
 
    private CustomDateInterceptor() {
    }
 
    public void initialize() {
        // NO-OP...
    }
 
    public void close() {
        // NO-OP...
    }


    public Event intercept(Event event) {
    	
    	Calendar calendar = Calendar.getInstance();
    	calendar.setTime(new Date());
    	calendar.add(Calendar.DATE, -1);
    	Date preDay = calendar.getTime();
    	
    	SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
    	String preDayFormat = sdf.format(preDay);
    	
    	Map headers = event.getHeaders();
    	headers.put("pre_day", preDayFormat);
    	event.setHeaders(headers);

        return event;
    }
 

    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }
 
    public static class Builder implements Interceptor.Builder {
        //使用Builder初始化Interceptor
        public Interceptor build() {
            return new CustomDateInterceptor();
        }
 

        public void configure(Context context) {
 
        }
    }
}

2.將專案打成jar包,放入flume下的 lib目錄下

3.custom-interceptor.conf 中應用自定義的interceptor

# tier1.conf: A single-node Flume configuration

# Name the components on this agent
tier1.sources = r1
tier1.sinks = k1
tier1.channels = c1


# Describe/configure the source
tier1.sources.r1.type = spooldir
tier1.sources.r1.spoolDir  = /opt/splash_cebspider/datas
tier1.sources.r1.fileSuffix = .completed
tier1.sources.r1.deletePolicy = never
#tier1.sources.r1.deletePolicy = immediate
tier1.sources.r1.fileHeader = true
tier1.sources.r1.includePattern = ^(.)*\\.json$
#tier1.sources.r1.ignorePattern = ^(.)*\\.processing$

#自定義interceptor將tier1.sinks.k1.hdfs.path設定為觸發時間的前一天
tier1.sources.r1.interceptors = dateset

tier1.sources.r1.interceptors.dateset.type = com.huajie.flume.interceptor.custominterceptor.CustomDateInterceptor$Builder
tier1.sources.r1.interceptors.dateset.preserveExisting = true

# Describe the sink
tier1.sinks.k1.type = hdfs
tier1.sinks.k1.channel = c1
tier1.sinks.k1.hdfs.path = /flume/events/ceb_spider/%{pre_day}
tier1.sinks.k1.hdfs.fileType = DataStream
tier1.sinks.k1.hdfs.filePrefix = events
tier1.sinks.k1.hdfs.minBlockReplicas=1
tier1.sinks.k1.hdfs.rollInterval=3600
tier1.sinks.k1.hdfs.rollSize=132692539
tier1.sinks.k1.hdfs.idleTimeout=10
tier1.sinks.k1.hdfs.batchSize = 1
tier1.sinks.k1.hdfs.rollCount=0
tier1.sinks.k1.hdfs.round = true
tier1.sinks.k1.hdfs.roundValue = 2
tier1.sinks.k1.hdfs.roundUnit = minute
tier1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
tier1.channels.c1.type = memory
tier1.channels.c1.capacity = 1000
tier1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
tier1.sources.r1.channels = c1
tier1.sinks.k1.channel = c1
 

3.啟動flume-ng 

flume-ng agent -c conf -f conf/custom-interceptor.conf -n tier1

參考連結:

相關推薦

記錄-----定義interceptor解決flume監控日誌HDFS日期後移問題

1.自定義flume的interceptor新增自定義header package com.huajie.flume.interceptor.custominterceptor; import com.google.common.collect.Lists; impor

定義TextView解決計算listView中item高度的時候如果其中的TextView字數超過一行只計算到一行高度的問題

    動態計算listView中item 的高度時,如果其中textview超過一行,只會計算其中第一行的剛度,(雙層listview巢狀使用時經常出現這種問題)。我們需要自定義textview,動態計算textView的高度,具體程式碼如下 package com.e

android retrofit 實戰定義converter解決相同介面返回不同資料的問題

square的retrofit是目前比較火的網路框架,我目前也在用 今天專案上遇到一個問題,就是請求同一個介面,可能返回不同的json格式 例如,訪問一個登入介面,成功的時候,返回的是 {     "code": 0,     "message": "登入成功",    

定義使用者和認證 中介軟體 檔案/oss 圖片驗證碼

自定義使用者 寫一個繼承自AbstractUser的類,然後追加自己需要的欄位 在settings.py加入AUTH_USER_MODEL = 'app的名字.使用者類' 自定義使用者認證 步驟 在app的目錄下新鍵一個檔案 auth.py 寫一個繼承自ModeBacke

Vue directive定義指令+canvas實現H5圖片壓縮-Base64格式

前言 最近優化專案-手機拍照圖片太大,回顯速度比較慢,使用了vue的自定義指令實現H5壓縮上傳base64格式的圖片 canvas自定義指令 Vue.directive("canvas",

ajaxFileUpload 檔案/圖片引數解決第二次無法問題

html: <script charset="utf-8" src="js/ajaxfileupload.js"></script> <input style="display:;" type="file" id="userfile" o

Flume的hdfsSink的roll引數不生效的原因(日誌hdfs

首先,本人菜雞一個,只是分享點東西出來,怕自己忘了,也給各位大佬填填坑噹噹墊背的! 事情是這樣的: 測試: 通過exec作為一個源,將tail -f /opt/20171130.log這樣命令接收到的資料上傳到HDFS 先給個官網路徑:http://flume.apach

主動模式和被動模式添加監控主機添加定義模板處理圖形中的亂碼自動發現

zabbix主動模式和被動模式 主動或者被動是相對客戶端來講的被動模式,服務端會主動連接客戶端獲取監控項目數據,客戶端被動地接受連接,並把監控信息傳遞給服務端主動模式,客戶端會主動把監控數據匯報給服務端,服務端只負責接收即可。當客戶端數量非常多時,建議使用主動模式,這樣可以降低服務端的壓力。服務端有公網ip,

主動模式和被動模式添加監控主機添加定義模板處理圖像中的亂碼自動發現

添加監控主機 name def 地址 觸發器 mar 桌面 客戶 release 主動模式和被動模式 主動或者被動是相對客戶端來講的 被動模式,服務端會主動連接客戶端獲取監控項目數據,客戶端被動地接受連接,並把監控信息傳遞給服務端 主動模式,客戶端會主動把監控數據匯報給服

Nginx定義日誌格式記錄定義的報文首部

Nginx內建有許多變數一般來說夠用了。但是如果有特殊需求也可以新增。 例如:自定義了一個報文首部X-Client 在自定義日誌格式後面加一個$http_X_Client,這裡版本是nginx/1.12.2 這裡利用Chrome的外掛 Modify Headers 添加了一個報文首

在springboot打包成jar後無法讀取定義檔案的解決辦法

前兩天在做springcloud框架下的專案的時候,用到有一個框架之外的檔案需要進行讀取,當時在eclipse中編碼時通過this.getClass().getResource來獲取檔案的路徑,沒有任何的問題,但是在打成jar以後,這是是打成jar包不是war,結果發現不能正常的讀取我放在工程裡

使用idea 構建hive的定義函式無法打包成jar包的問題解決

在昨天 晚上 遇到了用idea 打包自己的自定義函式成jar,怎麼都不行,翻查了很多部落格,發現CSDN上很多部落格都存在問題,而且還搜到很多部落格錯的一樣,但是博主名字 不一樣的情況,差點把我心態搞崩。 這是當時遇到的問題,還有一個問題是 在hive裡面根據這個jar包建立函式的時候,還

core學習歷程五 從壹開始前後端分離【 .NET Core2.0 +Vue2.0 】框架之十 || AOP面向切面程式設計淺解析:簡單日誌記錄 + 服務切面快取 從壹開始前後端分離【 .NET Core2.0 +Vue2.0 】框架之十 || AOP自定義篩選Redis入門 11.1

繼續學習 “老張的哲學”博主的系列教程,感謝大神們的無私分享 從壹開始前後端分離【 .NET Core2.0 +Vue2.0 】框架之十 || AOP面向切面程式設計淺解析:簡單日誌記錄 + 服務切面快取 說是朦朧,,emmm,對我來說是迷糊哈。上半段正常,下半段有點難理解,操作是沒問題。多看幾遍再消

flume定義interceptor和hbase sink

      在flume的實際應用中,可能會遇到對日誌進行簡單的過濾和處理。flume在source端有其內建的interceptor類可以對主機、IP、靜態標記做處理,如果想自定義處理邏輯該如何處理?在不規則的日誌資料進入hbase之前想做處理又該如何處理? 1.自定義

flume定義Interceptor的UUID和其他邏輯處理

package com.meme.flume.interceptor; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Even

定義Ratingbar星星“流淚了”解決辦法

一,xml  <RatingBar                 android:id="@+id/ratingBar_evaluation"                 style="@style/roomRatingBar"<!--自定義樣式,主要效果

nRF51822 定義UUIDble_advdata_set的時候 NRF_ERROR_DATA_SIZE 錯誤的解決

在做nRF51822的時候,需要自定義一個服務,那麼就要自定義一個UUID。 但是這個UUID在ble_advdata_set的時候,返回碼為 NRF_ERROR_DATA_SIZE。 經過搜尋資料,

ThinkPHP搜尋條件是陣列定義Sql條件解決辦法

今天遇到一個問題,原來的搜尋需要加一個搜尋,而這個搜尋是比較倆個欄位,而之前的搜尋條件是陣列,查閱資料解決辦法如下: // 原來的搜尋條件 $where = array( 'user_ex

android-繼承BaseAdapter--定義介面卡getView執行多次的解決方法

定義的getView執行多次的ListView佈局: <ListView android:id="@+id/lv_messages" android:layout

主動模式和被動模式新增監控主機新增定義模板處理圖形中的亂碼自動發現

主動或者被動是相對客戶端來講的被動模式,服務端會主動連線客戶端獲取監控專案資料,客戶端被動地接受連線,並把監控資訊傳遞給服務端主動模式,客戶端會主動把監控資料彙報給服務端,服務端只負責接收即可。當客戶端數量非常多時,建議使用主動模式,這樣可以降低服務端的壓力。服務端有公網ip,客戶端只有內網ip,但卻能連外網