flume自定義攔截器(Interceptor)拼接header和body資訊
一、需求背景
最近專案有這樣一個需求,分別採集不同應用不同機器上的日誌,在做日誌清洗後存入DB,資料庫表字段需要存當前日誌的來源,比如,來自於哪個專案,該專案的哪臺機器,由於我們使用的是flume來做日誌採集,故去翻flume的官網,發現有攔截器可以支援我的需求,一個是主機攔截器,可以在source之後配置,在header裡面拼上ip資訊,另一個是static攔截器,可以自定義key和value,這樣的話我們就可以自定義該日誌資訊來源於哪個專案了;
測試後發現,在設定了多個攔截器後,訊息的頭資訊裡面的確包含了ip和專案資訊,但是kafka收到的訊息只有訊息的body部分,沒有header資訊,一番折騰後,決定自定義一個flume攔截器,用來攔截event資訊,將裡面的頭資訊和body資訊取出來然後統一拼接到body裡面,經過測試,完美解決了我的需求。
二、實戰
如何自定義flume攔截器 ? 建一個maven工程,匯入flume-core包,然後實現interceptor介面,先看程式碼
package com.flume.schedule; import java.util.List; import java.util.Map; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; public class FlumeInterceptor implements Interceptor { private Logger logger = LoggerFactory.getLogger(FlumeInterceptor.class); @Override public void close() { } @Override public void initialize() { } @Override public Event intercept(Event event) { StringBuilder builder = new StringBuilder(); Map<String,String> headerMap = event.getHeaders(); String ip = ""; String projectName = ""; if(null != headerMap && !headerMap.isEmpty()) { ip = headerMap.get("host"); projectName = headerMap.get("projectname"); builder.append("ip:" + ip + ";projectname:" + projectName); } byte[] byteBody = event.getBody(); String body = new String(byteBody,Charsets.UTF_8); builder.append(";body:" + body); event.setBody(builder.toString().trim().getBytes()); logger.info("拼接後的body資訊:" + builder.toString().trim()); return event; } @Override public List<Event> intercept(List<Event> events) { for(final Event event : events) { intercept(event); } return events; } }
package com.flume.schedule; import org.apache.flume.Context; import org.apache.flume.interceptor.Interceptor; public class FlumeBuilder implements Interceptor.Builder{ @Override public void configure(Context context) { } @Override public Interceptor build() { return new FlumeInterceptor(); } }
只需要2個類,一個是FlumeIntercetor用來拼接頭資訊和body資訊,一個是FlumeBuilder用來啟動這個攔截器,然後將上面的程式碼打成一個jar包,放到flume的安裝目錄的plugin.d下(如果沒有自己建一個),如下
將我們的jar包放到lib下介面,然後配置flume的conf檔案,在攔截器配置部分,使用我們自定義的攔截器
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.s1.type = exec
a1.sources.s1.channels = c1
a1.sources.s1.command = tail -F /home/esuser/flume/logs/flume.log
#config interceptor
a1.sources.s1.interceptors=i1 i2 i3
a1.sources.s1.interceptors.i1.type=host
a1.sources.s1.interceptors.i1.useIP=true
a1.sources.s1.interceptors.i1.preserverExisting=false
a1.sources.s1.interceptors.i2.type=static
a1.sources.s1.interceptors.i2.key=projectname
a1.sources.s1.interceptors.i2.value=資料採集專案
a1.sources.s1.interceptors.i2.preserverExisting=false
# 使用自定義的攔截器,這裡一個source使用了3個攔截器,分別配置host,工程名,然後用我們自定義的攔截器進行拼接,flume在載入的時候會按順序載入
a1.sources.s1.interceptors.i3.type=com.flume.schedule.FlumeBuilder
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.80.132
a1.sinks.k1.port= 3333
a1.sinks.k1.channels = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.80.132
a1.sinks.k2.port = 4444
a1.sinks.k2.channels = c1
# Use a channel that buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 1000
然後啟動flume,kafka,storm,寫個定時任務每隔5s列印一條日誌資料;
看一下kafka消費者
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:05:05 [INFO] [org.springframework.jmx.export.MBeanExporter:449] - Unregistering JMX-exposed beans on shutdown
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:20 [INFO] [org.springframework.boot.StartupInfoLogger:48] - Starting FlumeApplication v0.0.1-SNAPSHOT on slave with PID 4859 (/home/esuser/flume/myconf/flume-0.0.1-SNAPSHOT.jar started by root in /home/esuser/flume/myconf)
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:20 [INFO] [org.springframework.boot.SpringApplication:661] - No active profile set, falling back to default profiles: default
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:21 [INFO] [org.springframework.context.support.AbstractApplicationContext:582] - Refreshing org.springframework.boot[email protected]2d38eb89: startup date [Mon Feb 12 14:53:21 PST 2018]; root of context hierarchy
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:24 [INFO] [org.hibernate.validator.internal.util.Version:30] - HV000001: Hibernate Validator 5.2.4.Final
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:48 [INFO] [org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer:87] - Tomcat initialized with port(s): 5678 (http)
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:49 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Starting service Tomcat
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:49 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Starting Servlet Engine: Apache Tomcat/8.5.6
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:51 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Initializing Spring embedded WebApplicationContext
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:51 [INFO] [org.springframework.boot.context.embedded.EmbeddedWebApplicationContext:276] - Root WebApplicationContext: initialization completed in 29741 ms
看一下storm的topology日誌
2018-02-12 15:01:20.122 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: []
2018-02-12 15:01:20.131 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=192.168.80.132:9092, topic=test, partition=0}]
2018-02-12 15:01:20.941 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-16-parseBolt01-executor[4 4]資料庫資訊:身份證
2018-02-12 15:01:20.942 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-16-parseBolt01-executor[4 4]資料庫資訊:戶口本
2018-02-12 15:01:20.942 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(4)
2018-02-12 15:01:20.943 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-6-parseBolt01-executor[3 3]資料庫資訊:身份證
2018-02-12 15:01:20.944 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-6-parseBolt01-executor[3 3]資料庫資訊:戶口本
2018-02-12 15:01:20.944 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(3)
2018-02-12 15:01:20.954 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-14-parseBolt01-executor[5 5]資料庫資訊:身份證
2018-02-12 15:01:20.954 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-14-parseBolt01-executor[5 5]資料庫資訊:戶口本
2018-02-12 15:01:20.963 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(5)
2018-02-12 15:01:21.134 o.a.s.d.executor [INFO] Prepared bolt insertbolt01:(2)
2018-02-12 15:01:22.224 o.a.s.k.PartitionManager [INFO] Read partition information from: /kafka2storm/id/partition_0 --> {"partition":0,"offset":7325,"topology":{"name":"flumestorm2mysql","id":"flumestorm2mysql-1-1518475953"},"topic":"test","broker":{"port":9092,"host":"192.168.80.132"}}
2018-02-12 15:01:22.666 o.a.s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 7325; old topology_id: flumestorm2mysql-1-1518475953 - new topology_id: hello2-2-1518476417
2018-02-12 15:01:22.673 o.a.s.k.PartitionManager [INFO] Starting Kafka 192.168.80.132:0 from offset 7325
2018-02-12 15:01:22.691 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing
2018-02-12 15:01:23.239 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:00 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:00;生產資料
2018-02-12 15:01:23.275 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:10 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:10;生產資料
2018-02-12 15:01:23.279 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:05 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:05;生產資料
2018-02-12 15:01:23.280 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:15 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:15;生產資料
2018-02-12 15:01:23.296 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:20 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:20;生產資料
2018-02-12 15:01:23.297 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:25 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:25;生產資料
2018-02-12 15:01:23.304 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:30 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:30;生產資料
2018-02-12 15:01:23.316 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:35 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:35;生產資料
2018-02-12 15:01:23.317 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:40 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:40;生產資料
2018-02-12 15:01:23.318 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:45 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:45;生產資料
2018-02-12 15:01:23.327 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:01:15 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:01:15;生產資料
最後看DB
可以看到,ip資訊和資料來自於哪個專案,都被我們採集到了。
相關推薦
flume自定義攔截器(Interceptor)拼接header和body資訊
一、需求背景 最近專案有這樣一個需求,分別採集不同應用不同機器上的日誌,在做日誌清洗後存入DB,資料庫表字段需要存當前日誌的來源,比如,來自於哪個專案,該專案的哪臺機器,由於我們使用的是flume來做日誌採集,故去翻flume的官網,發現有攔截器可以支援我的需求,
Hadoop生態圈-Flume的組件之自定義攔截器(interceptor)
events nbsp sin capacity figure IV mem nap code Hadoop生態圈-Flume的組件之自定義攔截器(interceptor)
FLUME單節點配置並自定義攔截器
3. Flume1.7.0解壓縮和更換目錄 # cd /opt # tar -xzvf apache-flume-1.7.0-bin.tar.gz # mv apache-flume-1.7.0-bin flume1.7.0 # chmod 777 -R /opt/f
flume-ng程式設計之自定義攔截器
從攔截body開始自定義intercepter程式設計完成每個body字串的解析欄位的正則提取和拼接,我們自定義的這個類叫:LogAnalysis 如下: package com.besttone.interceptor; import com.g
SpringBoot整合Mybatis自定義攔截器,實現拼接sql和修改
一、應用場景 1.分頁,如com.github.pagehelper的分頁外掛實現; 2.攔截sql做日誌監控; 3.統一對某些sql進行統一條件拼接,類似於分頁。 二、MyBatis的攔截器簡介 然後我們要知道攔截器攔截什麼樣的物件,攔截物件的什麼行為,什麼時候攔截? &n
在struts2中配置自定義攔截器放行多個方法
return med ttr limit ring req tac cat invoke 源碼: 自定義的攔截器類: //自定義攔截器類:LoginInterceptor ; package com.java.action.interceptor; import j
JAVAEE——struts2_04:自定義攔截器、struts2標簽、登陸功能和校驗登陸攔截器的實現
strac htm logs transacti 標識 area 返回 ftw jsp 一、自定義攔截器 1.架構 2.攔截器創建 //攔截器:第一種創建方式 //攔截器生命周期:隨項目的啟動而創建,隨項目關閉而銷毀 public class MyInt
struts2學習(6)自定義攔截器-登錄驗證攔截器
back tps class res urn fff .com space war 需求:對登錄進行驗證,用戶名cy 密碼123456才能登錄進去; 登錄進去後,將用戶存在session中; 其他鏈接要來訪問(除了登錄鏈接),首先驗證
自定義攔截器判斷用戶是否有權限訪問
indexof mon com source ora extend ide isa att 1、關於權限系統,對於用戶是否有權限對系統進行訪問,設置自定義攔截器,來攔截用戶的請求 1 package org.slsale.interceptor; 2 3 impo
spring boot框架學習8-【幹貨】spring boot的web開發(4)-自定義攔截器處理權限
凱哥spring boot spring boot框架 本章節主要內容:通過前面的學習,我們了解並快速完成了spring boot第一個應用。spring boot企業級框架,那麽spring boot怎麽讀取靜態資源?如js文件夾,css文件以及png/jpg圖片呢?怎麽自定義消息轉換器呢?怎麽自定
【第四十章】Spring Boot 自定義攔截器
ram obj pre .config factor ati bean configure 邏輯 1.首先編寫攔截器代碼 package com.sarnath.interceptor; import javax.servlet.http.HttpServlet
struts2自定義攔截器
ctrl+ font 重要 ali clas 根據 cat XML 準備 Struts自定義攔截器有什麽作用? 攔截器可以做前期準備工作,如果準備工作沒有做好,則先跳轉到準備的操作頁面,更加合理。比如:登錄才能進行的操作,如果檢測沒有登錄,那就重定向到登錄頁面。 1. St
Struts——自定義攔截器
全局 brush sys tca ima http param 方法 配置 設置全局的異常 攔截器 public class LoginInterceptor extends MethodFilterInterceptor { protected St
MVC自定義攔截器Intercepetor
pan return http throw span implement bject tor lan import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRe
springboot 2.0+ 自定義攔截器 靜態資源問題
per static index 進行 onf 自定義攔截器 tor gis css 之前項目的springboot自定義攔截器使用的是繼承WebMvcConfigurerAdapter重寫常用方法的方式來實現的.靜態文件不需要進行放行,springboot會自動幫你放行。
整合spring之後,struts2裏面的自定義攔截器的invocation.invoke()總是返回input
put 每次 let 應該 singleton prot 定義 art 多例 這個真的是整死我了,還好看見了一篇博客提示了我, 解決方法: 在spring的bean配置中我沒有設置action的作用域為prototype,也就是多例的,如果不設置則就會是默認的singl
Struts2自定義攔截器處理全域性異常
今天在整理之前的專案的時候想著有的action層沒有做異常處理,於是想著自定義攔截器處理一下未攔截的異常。 程式碼: package cn.xm.exam.action.safeHat; import java.util.HashMap; import java.util
Struts2自定義攔截器案例:驗證使用者是否登入攔截器
Struts攔截器是struts最強大的功能之一,也是他的核心 它可以在Action前後做一些事情,比如使用者登入驗證,這裡主要針對使用者登入配置詳細說明 一 首先自定義一個使用者攔截類,必須實現Interceptor介面或者繼承他的實現類 因為我們是要攔截使用者登入的,這裡繼承Meth
SpringBoot(11) SpringBoot自定義攔截器
自定義攔截器共兩步:第一:註冊。第二:定義攔截器。 一、註冊 @Configuration 繼承WebMvcConfigurationAdapter(SpringBoot2.X之前舊版本) 舊版本程式碼 1 @Configuration 2 public class CustomO
webservice學習筆記(九):CXF攔截器/自定義攔截器
1.CXF的攔截器 a.CXF攔截器能夠動態的操作webservice請求過程中的操作請求和響應資料 2.攔截器分類 a.按所處的位置分為:伺服器端攔截器,客戶端攔截器 b.按訊息的方向分為:入攔截器,出攔截器 c.按定義者分為:系統攔截器,自定義攔截器 3.攔截器API