1. 程式人生 > >Flume-ng生產環境實踐(四)實現log格式化interceptor

Flume-ng生產環境實踐(四)實現log格式化interceptor

續上篇,由於filesink中需要使用/data/log/%{dayStr}/log-%{hourStr}%{minStr}-這樣檔案格式的,為了使file-sink能使用%{dayStr}這樣的標籤,需要在資料傳輸過程中,給event的header中新增對應的鍵值對。在flume-ng中提供了很方便的方式:Interceptor 以下為實現的interceptor,首先使用正則表示式匹配nginx日誌,如何匹配成功,則獲取匹配到的資料,並且對url中的引數進行處理,最後所有日誌資訊都被儲存在Map中。根據配置檔案中需要輸出的鍵找到對應的值,按照順序輸出為csv格式的行。 原始日誌格式:
112.245.239.72 - - [29/Dec/2012:15:00:00 +0800] "GET /p.gif?a=1&b=2HTTP/1.1" 200 0 "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4
.0; 4399Box.1357; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; AskTbPTV2/5.9.1.14019; 4399Box.1357)"
最終結果:
1,2
配置資訊為:
agent.sources = source
agent.channels = channel
agent.sinks = sink

agent.sources.source.type = exec
#agent.sources.source.command = tail -n +0 -F /data/tmp/accesspvpb_2012-11-18.log
agent.sources.source.command = cat /opt/nginx/logs/vvaccess_log_pipe
agent.sources.source.interceptors = logformat

agent.sources.source.interceptors.logformat.type = org.apache.flume.interceptor.LogFormatInterceptor$Builder
agent.sources.source.interceptors.logformat.confpath = /usr/programs/flume/conf/logformat_vv.properties
agent.sources.source.interceptors.logformat.dynamicprop = true
agent.sources.source.interceptors.logformat.hostname = vv111
agent.sources.source.interceptors.logformat.prop.monitor.rollInterval = 100000
# The channel can be defined as follows.
agent.sources.source.channels = channel


agent.sinks.sink.type = avro
agent.sinks.sink.hostname = 192.168.0.100
agent.sinks.sink.port = 44444
agent.sinks.sink.channel = channel

# Each channel's type is defined.
agent.channels.channel.type = file
agent.channels.channel.checkpointDir = /data/tmpc/checkpoint
agent.channels.channel.dataDirs = /data/tmpc/data
agent.channels.channel.transactionCapacity = 15000
/usr/programs/flume/conf/logformat_vv.properties檔案內容為:
keys=a,b
regexp=([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})\\s-\\s-\\s\\[([^]]+)\\]\\s\"GET\\s/p.gif\\?(.+)\\s.*\"\\s[0-9]+\\s[0-9]+\\s\"(.+)\"
interceptor的程式碼:
packageorg.apache.flume.interceptor; importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.
CONF_PATH; importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP; importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP_DFLT; importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME; importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME_DFLT; importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL; importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL_DFLT; importjava.io.File; importjava.io.FileInputStream; importjava.io.FileNotFoundException; importjava.io.IOException; importjava.text.ParseException; importjava.text.SimpleDateFormat; importjava.util.Date; importjava.util.HashMap; importjava.util.LinkedList; importjava.util.List; importjava.util.Map; importjava.util.Properties; importorg.apache.flume.Context; importorg.apache.flume.Event; importorg.apache.flume.event.EventBuilder; importorg.apache.oro.text.regex.MalformedPatternException; importorg.apache.oro.text.regex.MatchResult; importorg.apache.oro.text.regex.Pattern; importorg.apache.oro.text.regex.PatternCompiler; importorg.apache.oro.text.regex.PatternMatcher; importorg.apache.oro.text.regex.Perl5Compiler; importorg.apache.oro.text.regex.Perl5Matcher; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; publicclassLogFormatInterceptorimplementsInterceptor{ privatestaticfinalLoggerlogger= LoggerFactory .getLogger(LogFormatInterceptor.class); privateStringconf_path=null; privatebooleandynamicProp=false; privateStringhostname=null; privatelongpropLastModify= 0; privatelongpropMonitorInterval; privateStringregexp=null; privateList<String>keys=null; privatePatternpattern=null; privatePatternCompilercompiler=null; privatePatternMatchermatcher=null; privateSimpleDateFormatsdf=null; privateSimpleDateFormatsd=null; privateSimpleDateFormatsh=null; privateSimpleDateFormatsm=null; privateSimpleDateFormatsdfAll=null; privatelongeventCount= 0l; publicLogFormatInterceptor(String conf_path,booleandynamicProp, String hostname,longpropMonitorInterval) { this.conf_path= conf_path; this.dynamicProp= dynamicProp; this.hostname= hostname; this.propMonitorInterval= propMonitorInterval; } @Override publicvoidclose() { } @Override publicvoidinitialize() { try{ // 讀取配置檔案,初始化正在表示式和輸出的key列表 File file =newFile(conf_path); propLastModify= file.lastModified(); Properties props =newProperties(); FileInputStream fis; fis =newFileInputStream(file); props.load(fis); regexp= props.getProperty("regexp"); String strKey = props.getProperty("keys"); if(strKey !=null) { String[] strkeys = strKey.split(","); keys=newLinkedList<String>(); for(String key : strkeys) { keys.add(key); } } if(keys==null) { logger.error("====================keys is null===================="); }else{ logger.info("keys="+keys); } if(regexp==null) { logger.error("====================regexp is null===================="); }else{ logger.info("regexp="+regexp); } // 初始化正在表示式以及時間格式化類 compiler=newPerl5Compiler(); pattern=compiler.compile(regexp); matcher=newPerl5Matcher(); sdf=newSimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", java.util.Locale.US); sd=newSimpleDateFormat("yyyyMMdd"); sh=newSimpleDateFormat("HH"); sm=newSimpleDateFormat("mm"); sdfAll=newSimpleDateFormat("yyyyMMddHHmmss"); }catch(MalformedPatternException e) { logger.error("Could not complile pattern!", e); }catch(FileNotFoundException e) { logger.error("conf file is not found!", e); }catch(IOException e) { logger.error("conf file can not be read!", e); } } @Override publicEventintercept(Event event) { ++eventCount; try{ if(dynamicProp&&eventCount>propMonitorInterval) { File file =newFile(conf_path); if(file.lastModified() >propLastModify) { propLastModify= file.lastModified(); Properties props =newProperties(); FileInputStream fis; fis =newFileInputStream(file); props.load(fis); String strKey = props.getProperty("keys"); if(strKey !=null) { String[] strkeys = strKey.split(","); List<String> keystmp =newLinkedList<String>(); for(String key : strkeys) { keystmp.add(key); } if(keystmp.size() >keys.size()) { keys= keystmp; logger.info("dynamicProp status updated = "+keys); }else{ logger.error("dynamicProp status new keys size less than old,so status update fail = " +keys); } }else{ logger.error("dynamicProp status get keys fail ,so status update fail = " +keys); } } } Map<String, String> headers = event.getHeaders(); headers.put("host",hostname); String body =newString(event.getBody()); if(pattern!=null) { StringBuffer stringBuffer =newStringBuffer(); Date date =null; Map<String, String> index =newHashMap<String, String>(); if(matcher.contains(body,pattern

相關推薦

Flume-ng生產環境實踐實現log格式化interceptor

續上篇,由於filesink中需要使用/data/log/%{dayStr}/log-%{hourStr}%{minStr}-這樣檔案格式的,為了使file-sink能使用%{dayStr}這樣的標籤,需要在資料傳輸過程中,給event的header中新增對應的鍵值對。在fl

Flume-ng生產環境實踐實現檔案sink,按照固定格式目錄輸出

package org.apache.flume.sink; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; impor

RocketMQ最佳實踐視覺化管理控制檯rocketmq-console-ng

RocketMQ有一個對其擴充套件的開源專案incubator-rocketmq-externals,這個專案中有一個子模組叫“rocketmq-console”,這個便是管理控制檯專案了。通過命令列進入到rocketmq-console子目錄,通過maven對其進行編譯打包

API開發實踐 返回HTML

acea 指定 win filename static box 拖動地圖 ive let 分為兩個部分:生成HTML和返回HTML 生成HTML: 最終想要的時顯示地圖,不可避免的使用高德地圖的API。 【地圖API】地址錄入時如何獲得準確的經緯度?淘寶收貨地址詳解 改變幾

轉載:monkeyrunner之eclipse中運行monkeyrunner腳本之環境搭建

導包 rep 是把 body tle cnblogs 9.png 解決方法 align 轉載自:lynnLi 的monkeyrunner之eclipse中運行monkeyrunner腳本之環境搭建(四) monkeyrunner腳本使用Python語法編寫,但它實際上是通

MVC項目實踐——EDM實現

開發 ron key com sum lldb 實體類 資源管理器 space 實體數據模型 (EDM) 是一個規範,用於定義由在 實體框架 基礎上生成的應用程序使用的數據。使用 EDM 的應用程序在設計架構中定義應用程序域中的實體和關系。設計架構用於生成由應用程序代碼使用

三維渲染引擎設計與實踐

方式 lora 扇面 多個 幀緩存 binding osg smo tco 五、繪制幾何對象和文字 幀緩存(Frame Buffer)為用戶與顯示設備交互的一個接口,將顯示的畫面抽象成一塊可以進行讀寫操作的內存區域。 幀緩存的每一個存儲單元都對應顯示屏上的一個像素。整個緩存

Spark環境搭建-----------數據倉庫Hive環境搭建

apr 程序 版本 擴展 arch 表名 數據集 .tar.gz 自定義 Hive產生背景 1)MapReduce的編程不便,需通過Java語言等編寫程序 2) HDFS上的文缺失Schema(在數據庫中的表名列名等),方便開發者通過SQL的方式處理結構化的數據,而不需

將ASP.NET Core應用程序部署至生產環境CentOS7

for linux home web 虛擬 direct director block bic 閱讀目錄 環境說明 準備你的ASP.NET Core應用程序 安裝CentOS7 安裝.NET Core SDK for CentOS7。 部署ASP.NET

Angular開發實踐:組件之間的交互

lec previous call 私有 判斷 處理方法 數組 依然 處理 在Angular應用開發中,組件可以說是隨處可見的。本篇文章將介紹幾種常見的組件通訊場景,也就是讓兩個或多個組件之間交互的方法。 根據數據的傳遞方向,分為父組件向子組件傳遞、子組件向父組件傳遞及通過

Docker學習與實踐

registry docker 四、倉庫管理 1.創建本地倉庫 ①獲取官方registry鏡像 [root@dockertest ~]# docker run -d -p 5000:5000 --restart=always --name registry registry:2 Unable to f

小程序實踐:動態控制組件的顯示/隱藏

-s -- hid 控制 alt 實踐 als nbsp 小程序 組件有個屬性:hidden=‘‘ ,值為true/false ,當false的時候說明不隱藏,當true的時候說明隱藏,註意該隱藏是不保留組件位置的。 實現即 .js 配合.wxml 文件 一、在.j

Spring Boot 最佳實踐模板引擎Thymeleaf集成

data 圖層 int app 創建模板 原因 xmlns make 使用場景 一、Thymeleaf介紹 Thymeleaf是一種Java XML / XHTML / HTML5模板引擎,可以在Web和非Web環境中使用。它更適合在基於MVC的Web應用程序的視圖層提供X

機器學習實踐—sklearn之特徵預處理

一、特徵預處理概述 什麼是特徵預處理 # scikit-learn的解釋 provides several common utility functions and transformer classes to change raw feature vectors into

Elasticsearch實踐:IK分詞

完成 詞語 char 1.2 prop tokenize 字母 發生 odi 環境:Elasticsearch 6.2.4 + Kibana 6.2.4 + ik 6.2.4 Elasticsearch默認也能對中文進行分詞。 我們先來看看自帶的中文分詞效果: curl

mysql實踐

資料型別 數字 二進位制 bit(m) # m 位數 整數 tinyint samllint int bigint 浮點數 decimal #準確的 float double 字串 定長 char(m) 查詢速度快,浪費空間 變長 varchar(m) 查詢速度慢,節約空

【MVC】.NET實踐—新增資料到資料庫

1、在主介面Index.cshtml新增“新增”的連結 <tr> <td colspan="4"> @Html.ActionLink("新增",

Docker實踐Dockerfile

什麼是Dockerfile? 使用Dockerfile,可以方便的建立自定義映象。 基本結構 由一行行命令組成,支援#註釋。Dockerfile一般分為四個部分: 基礎映象資訊 維護者資訊 映象操作指令 容器啟動指令 如下面是一個基礎的Dockerfile

Docker學習實踐

Docker 映象 可以將 Docker 映象理解為包含應用程式以及其相關依賴的一個基礎檔案系統,在 Docker 容器啟動的過程中,它以只讀的方式被用於建立容器的執行環境。 從另一個角度看,Docker 映象其實是由基於 UnionFS 檔案系統的一組映象層依次掛載而得,

Docker映象之Java環境搭建

打算通過Docker搭建hadoop叢集,但在開始之前得先搭建一個java環境,所以決定在基於Ubuntu映象的基礎上進行。 首先啟動一個容器,然後按照傳統方案配置java環境,即解壓java壓縮包,然後配置相關環境變數到/etc/profile下,退出容器,使用docke