1. 程式人生 > >flume自定義攔截器(Interceptor)拼接header和body資訊

flume自定義攔截器(Interceptor)拼接header和body資訊

一、需求背景

       最近專案有這樣一個需求,分別採集不同應用不同機器上的日誌,在做日誌清洗後存入DB,資料庫表字段需要存當前日誌的來源,比如,來自於哪個專案,該專案的哪臺機器,由於我們使用的是flume來做日誌採集,故去翻flume的官網,發現有攔截器可以支援我的需求,一個是主機攔截器,可以在source之後配置,在header裡面拼上ip資訊,另一個是static攔截器,可以自定義key和value,這樣的話我們就可以自定義該日誌資訊來源於哪個專案了;

      測試後發現,在設定了多個攔截器後,訊息的頭資訊裡面的確包含了ip和專案資訊,但是kafka收到的訊息只有訊息的body部分,沒有header資訊,一番折騰後,決定自定義一個flume攔截器,用來攔截event資訊,將裡面的頭資訊和body資訊取出來然後統一拼接到body裡面,經過測試,完美解決了我的需求。

二、實戰

    如何自定義flume攔截器 ? 建一個maven工程,匯入flume-core包,然後實現interceptor介面,先看程式碼

package com.flume.schedule;

import java.util.List;
import java.util.Map;

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;

public class FlumeInterceptor implements Interceptor {
	
	private Logger logger = LoggerFactory.getLogger(FlumeInterceptor.class);

	@Override
	public void close() {
		
	}

	@Override
	public void initialize() {
		
	}

	@Override
	public Event intercept(Event event) {
		StringBuilder builder = new StringBuilder();
		Map<String,String> headerMap = event.getHeaders();
		String ip = "";
		String projectName = "";
		if(null != headerMap && !headerMap.isEmpty()) {
			 ip = headerMap.get("host");
			 projectName = headerMap.get("projectname");
			 builder.append("ip:" + ip + ";projectname:" + projectName);
		}
                byte[] byteBody = event.getBody();
		String body = new String(byteBody,Charsets.UTF_8);
		builder.append(";body:" + body);
		event.setBody(builder.toString().trim().getBytes());
		logger.info("拼接後的body資訊:" + builder.toString().trim());
		return event;
	}

	@Override
	public List<Event> intercept(List<Event> events) {
		for(final Event event : events) {
			intercept(event);
		}
		return events;
	}

}
 
package com.flume.schedule;

import org.apache.flume.Context;
import org.apache.flume.interceptor.Interceptor;

public class FlumeBuilder implements Interceptor.Builder{

	@Override
	public void configure(Context context) {
		
	}

	@Override
	public Interceptor build() {
		return new FlumeInterceptor();
	}

}

 只需要2個類,一個是FlumeIntercetor用來拼接頭資訊和body資訊,一個是FlumeBuilder用來啟動這個攔截器,然後將上面的程式碼打成一個jar包,放到flume的安裝目錄的plugin.d下(如果沒有自己建一個),如下

 

 將我們的jar包放到lib下介面,然後配置flume的conf檔案,在攔截器配置部分,使用我們自定義的攔截器

a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source

a1.sources.s1.type = exec
a1.sources.s1.channels = c1
a1.sources.s1.command = tail -F  /home/esuser/flume/logs/flume.log

#config interceptor
a1.sources.s1.interceptors=i1 i2 i3
a1.sources.s1.interceptors.i1.type=host
a1.sources.s1.interceptors.i1.useIP=true
a1.sources.s1.interceptors.i1.preserverExisting=false

a1.sources.s1.interceptors.i2.type=static
a1.sources.s1.interceptors.i2.key=projectname
a1.sources.s1.interceptors.i2.value=資料採集專案
a1.sources.s1.interceptors.i2.preserverExisting=false

# 使用自定義的攔截器,這裡一個source使用了3個攔截器,分別配置host,工程名,然後用我們自定義的攔截器進行拼接,flume在載入的時候會按順序載入
a1.sources.s1.interceptors.i3.type=com.flume.schedule.FlumeBuilder

# Describe the sink

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.80.132
a1.sinks.k1.port= 3333
a1.sinks.k1.channels = c1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.80.132
a1.sinks.k2.port = 4444
a1.sinks.k2.channels = c1

# Use a channel that buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1



a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 1000

然後啟動flume,kafka,storm,寫個定時任務每隔5s列印一條日誌資料;

看一下kafka消費者

ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:05:05 [INFO] [org.springframework.jmx.export.MBeanExporter:449] - Unregistering JMX-exposed beans on shutdown
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:20 [INFO] [org.springframework.boot.StartupInfoLogger:48] - Starting FlumeApplication v0.0.1-SNAPSHOT on slave with PID 4859 (/home/esuser/flume/myconf/flume-0.0.1-SNAPSHOT.jar started by root in /home/esuser/flume/myconf)
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:20 [INFO] [org.springframework.boot.SpringApplication:661] - No active profile set, falling back to default profiles: default
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:21 [INFO] [org.springframework.context.support.AbstractApplicationContext:582] - Refreshing org.springframework.boot[email protected]2d38eb89: startup date [Mon Feb 12 14:53:21 PST 2018]; root of context hierarchy
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:24 [INFO] [org.hibernate.validator.internal.util.Version:30] - HV000001: Hibernate Validator 5.2.4.Final
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:48 [INFO] [org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer:87] - Tomcat initialized with port(s): 5678 (http)
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:49 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Starting service Tomcat
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:49 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Starting Servlet Engine: Apache Tomcat/8.5.6
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:51 [INFO] [org.apache.juli.logging.DirectJDKLog:179] - Initializing Spring embedded WebApplicationContext
ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 14:53:51 [INFO] [org.springframework.boot.context.embedded.EmbeddedWebApplicationContext:276] - Root WebApplicationContext: initialization completed in 29741 ms

看一下storm的topology日誌

2018-02-12 15:01:20.122 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: []
2018-02-12 15:01:20.131 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=192.168.80.132:9092, topic=test, partition=0}]
2018-02-12 15:01:20.941 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-16-parseBolt01-executor[4 4]資料庫資訊:身份證
2018-02-12 15:01:20.942 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-16-parseBolt01-executor[4 4]資料庫資訊:戶口本
2018-02-12 15:01:20.942 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(4)
2018-02-12 15:01:20.943 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-6-parseBolt01-executor[3 3]資料庫資訊:身份證
2018-02-12 15:01:20.944 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-6-parseBolt01-executor[3 3]資料庫資訊:戶口本
2018-02-12 15:01:20.944 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(3)
2018-02-12 15:01:20.954 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-14-parseBolt01-executor[5 5]資料庫資訊:身份證
2018-02-12 15:01:20.954 c.l.s.ParseBolt [INFO] 當前執行緒:Thread-14-parseBolt01-executor[5 5]資料庫資訊:戶口本
2018-02-12 15:01:20.963 o.a.s.d.executor [INFO] Prepared bolt parseBolt01:(5)
2018-02-12 15:01:21.134 o.a.s.d.executor [INFO] Prepared bolt insertbolt01:(2)
2018-02-12 15:01:22.224 o.a.s.k.PartitionManager [INFO] Read partition information from: /kafka2storm/id/partition_0  --> {"partition":0,"offset":7325,"topology":{"name":"flumestorm2mysql","id":"flumestorm2mysql-1-1518475953"},"topic":"test","broker":{"port":9092,"host":"192.168.80.132"}}
2018-02-12 15:01:22.666 o.a.s.k.PartitionManager [INFO] Read last commit offset from zookeeper: 7325; old topology_id: flumestorm2mysql-1-1518475953 - new topology_id: hello2-2-1518476417
2018-02-12 15:01:22.673 o.a.s.k.PartitionManager [INFO] Starting Kafka 192.168.80.132:0 from offset 7325
2018-02-12 15:01:22.691 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing
2018-02-12 15:01:23.239 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:00 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:00;生產資料
2018-02-12 15:01:23.275 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:10 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:10;生產資料
2018-02-12 15:01:23.279 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:05 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:05;生產資料
2018-02-12 15:01:23.280 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:15 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:15;生產資料
2018-02-12 15:01:23.296 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:20 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:20;生產資料
2018-02-12 15:01:23.297 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:25 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:25;生產資料
2018-02-12 15:01:23.304 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:30 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:30;生產資料
2018-02-12 15:01:23.316 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:35 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:35;生產資料
2018-02-12 15:01:23.317 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:40 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:40;生產資料
2018-02-12 15:01:23.318 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:00:45 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:00:45;生產資料
2018-02-12 15:01:23.327 c.l.s.ParseBolt [INFO] bolt receive message : ip:192.168.80.132;projectname:資料採集專案;body:2018-02-12 15:01:15 [INFO] [com.flume.schedule.FlumeSchedule:22] - 當前時間:2018-02-12 15:01:15;生產資料

最後看DB

可以看到,ip資訊和資料來自於哪個專案,都被我們採集到了。

相關推薦

flume定義攔截(Interceptor)拼接headerbody資訊

一、需求背景       最近專案有這樣一個需求,分別採集不同應用不同機器上的日誌,在做日誌清洗後存入DB,資料庫表字段需要存當前日誌的來源,比如,來自於哪個專案,該專案的哪臺機器,由於我們使用的是flume來做日誌採集,故去翻flume的官網,發現有攔截器可以支援我的需求,

Hadoop生態圈-Flume的組件之定義攔截interceptor

events nbsp sin capacity figure IV mem nap code                 Hadoop生態圈-Flume的組件之自定義攔截器(interceptor)                                   

FLUME單節點配置並定義攔截

  3. Flume1.7.0解壓縮和更換目錄 # cd /opt # tar -xzvf apache-flume-1.7.0-bin.tar.gz # mv apache-flume-1.7.0-bin flume1.7.0 # chmod 777 -R /opt/f

flume-ng程式設計之定義攔截

     從攔截body開始自定義intercepter程式設計完成每個body字串的解析欄位的正則提取和拼接,我們自定義的這個類叫:LogAnalysis 如下: package com.besttone.interceptor; import com.g

SpringBoot整合Mybatis定義攔截,實現拼接sql修改

一、應用場景 1.分頁,如com.github.pagehelper的分頁外掛實現; 2.攔截sql做日誌監控; 3.統一對某些sql進行統一條件拼接,類似於分頁。 二、MyBatis的攔截器簡介 然後我們要知道攔截器攔截什麼樣的物件,攔截物件的什麼行為,什麼時候攔截? &n

在struts2中配置定義攔截放行多個方法

return med ttr limit ring req tac cat invoke 源碼: 自定義的攔截器類: //自定義攔截器類:LoginInterceptor ; package com.java.action.interceptor; import j

JAVAEE——struts2_04:定義攔截、struts2標簽、登陸功能校驗登陸攔截的實現

strac htm logs transacti 標識 area 返回 ftw jsp 一、自定義攔截器   1.架構      2.攔截器創建 //攔截器:第一種創建方式 //攔截器生命周期:隨項目的啟動而創建,隨項目關閉而銷毀 public class MyInt

struts2學習(6)定義攔截-登錄驗證攔截

back tps class res urn fff .com space war 需求:對登錄進行驗證,用戶名cy 密碼123456才能登錄進去;   登錄進去後,將用戶存在session中; 其他鏈接要來訪問(除了登錄鏈接),首先驗證

定義攔截判斷用戶是否有權限訪問

indexof mon com source ora extend ide isa att 1、關於權限系統,對於用戶是否有權限對系統進行訪問,設置自定義攔截器,來攔截用戶的請求 1 package org.slsale.interceptor; 2 3 impo

spring boot框架學習8-【幹貨】spring boot的web開發(4)-定義攔截處理權限

凱哥spring boot spring boot框架 本章節主要內容:通過前面的學習,我們了解並快速完成了spring boot第一個應用。spring boot企業級框架,那麽spring boot怎麽讀取靜態資源?如js文件夾,css文件以及png/jpg圖片呢?怎麽自定義消息轉換器呢?怎麽自定

【第四十章】Spring Boot 定義攔截

ram obj pre .config factor ati bean configure 邏輯 1.首先編寫攔截器代碼 package com.sarnath.interceptor; import javax.servlet.http.HttpServlet

struts2定義攔截

ctrl+ font 重要 ali clas 根據 cat XML 準備 Struts自定義攔截器有什麽作用? 攔截器可以做前期準備工作,如果準備工作沒有做好,則先跳轉到準備的操作頁面,更加合理。比如:登錄才能進行的操作,如果檢測沒有登錄,那就重定向到登錄頁面。 1. St

Struts——定義攔截

全局 brush sys tca ima http param 方法 配置 設置全局的異常 攔截器 public class LoginInterceptor extends MethodFilterInterceptor { protected St

MVC定義攔截Intercepetor

pan return http throw span implement bject tor lan import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRe

springboot 2.0+ 定義攔截 靜態資源問題

per static index 進行 onf 自定義攔截器 tor gis css 之前項目的springboot自定義攔截器使用的是繼承WebMvcConfigurerAdapter重寫常用方法的方式來實現的.靜態文件不需要進行放行,springboot會自動幫你放行。

整合spring之後,struts2裏面的定義攔截的invocation.invoke()總是返回input

put 每次 let 應該 singleton prot 定義 art 多例 這個真的是整死我了,還好看見了一篇博客提示了我, 解決方法:   在spring的bean配置中我沒有設置action的作用域為prototype,也就是多例的,如果不設置則就會是默認的singl

Struts2定義攔截處理全域性異常

  今天在整理之前的專案的時候想著有的action層沒有做異常處理,於是想著自定義攔截器處理一下未攔截的異常。   程式碼: package cn.xm.exam.action.safeHat; import java.util.HashMap; import java.util

Struts2定義攔截案例:驗證使用者是否登入攔截

Struts攔截器是struts最強大的功能之一,也是他的核心 它可以在Action前後做一些事情,比如使用者登入驗證,這裡主要針對使用者登入配置詳細說明 一 首先自定義一個使用者攔截類,必須實現Interceptor介面或者繼承他的實現類 因為我們是要攔截使用者登入的,這裡繼承Meth

SpringBoot(11) SpringBoot定義攔截

自定義攔截器共兩步:第一:註冊。第二:定義攔截器。 一、註冊 @Configuration 繼承WebMvcConfigurationAdapter(SpringBoot2.X之前舊版本) 舊版本程式碼 1 @Configuration 2 public class CustomO

webservice學習筆記(九):CXF攔截/定義攔截

1.CXF的攔截器 a.CXF攔截器能夠動態的操作webservice請求過程中的操作請求和響應資料 2.攔截器分類 a.按所處的位置分為:伺服器端攔截器,客戶端攔截器 b.按訊息的方向分為:入攔截器,出攔截器 c.按定義者分為:系統攔截器,自定義攔截器 3.攔截器API