Flume-ng+Hbase實現日誌的收集和儲存
flume ng 日誌處理並存入資料庫:
flume-ng裡面的SimpleHbaseEventSerializer只提供了最簡單的資料插入hbase功能,如果還有其他需要,就得自己寫HbaseEventSerializer類,實現flume中的HbaseEventSerializer介面。一個簡單的例項如下:
1.前提條件
Hadoop+HBase+Zookeeper+Flume-ng
2.解析日誌程式
①AccessLog.java
package com.tcloud.flume; public class AccessLog { private String clientIp; private private String remoteUser; private String dateTime; private String request; private String httpStatusCode; private String bytesSent; private String referer; private String userAgent; public String getClientIp() { return clientIp; } public void setClientIp(String clientIp this.clientIp = clientIp; } public String getClientIndentity() { return clientIndentity; } public void setClientIndentity(String clientIndentity) { this.clientIndentity = clientIndentity; } public String getRemoteUser() { return remoteUser; } public void setRemoteUser(String remoteUser) { this.remoteUser } public String getDateTime() { return dateTime; } public void setDateTime(String dateTime) { this.dateTime = dateTime; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getHttpStatusCode() { return httpStatusCode; } public void setHttpStatusCode(String httpStatusCode) { this.httpStatusCode = httpStatusCode; } public String getBytesSent() { return bytesSent; } public void setBytesSent(String bytesSent) { this.bytesSent = bytesSent; } public String getReferer() { return referer; } public void setReferer(String referer) { this.referer = referer; } public String getUserAgent() { return userAgent; } public void setUserAgent(String userAgent) { this.userAgent = userAgent; } } |
②AccessLogParser.java
package com.tcloud.flume; import java.util.regex.Matcher; import java.util.regex.Pattern; public class AccessLogParser { /** * 日誌格式 * 11.52.10.49 - - [17/Sep/2015:11:35:21 +0800] "GET /webapp HTTP/1.1" 302 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36" */ private static String pattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\""; private static Pattern p = Pattern.compile(pattern); public static AccessLog parse(String line){ Matchermatcher = p.matcher(line); if (matcher.matches()){ AccessLog accessLog = new AccessLog(); accessLog.setClientIp(matcher.group(1)); accessLog.setClientIndentity(matcher.group(2)); accessLog.setRemoteUser(matcher.group(3)); accessLog.setDateTime(matcher.group(4)); accessLog.setRequest(matcher.group(5)); accessLog.setHttpStatusCode(matcher.group(6)); accessLog.setBytesSent(matcher.group(7)); accessLog.setReferer(matcher.group(8)); accessLog.setUserAgent(matcher.group(9)); return accessLog; } return null; } } |
③AsyncHbaseLogEventSerializer.java
package com.tcloud.flume; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Locale; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.HbaseEventSerializer; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.util.Bytes; public class AsyncHbaseLogEventSerializer implements HbaseEventSerializer { //列族 private byte[] colFam="cf".getBytes(); private Event currentEvent; @Override public void initialize(Event event, byte[] colFam) { this.currentEvent = event; this.colFam = colFam; } @Override public void configure(Context context) {} @Override public void configure(ComponentConfiguration conf) { } @Override public List<Row> getActions() { // Split the event body and get the values for the columns String eventStr = new String(currentEvent.getBody()); AccessLog cols = AccessLogParser.parse(eventStr); String req = cols.getRequest(); String reqPath = req.split(" ")[1]; int pos = reqPath.indexOf("?"); if (pos > 0) { reqPath = reqPath.substring(0,pos); } if(reqPath.length() > 1 && reqPath.trim().endsWith("/")){ reqPath = reqPath.substring(0,reqPath.length()-1); } String req_ts_str = cols.getDateTime(); Long currTime = System.currentTimeMillis(); String currTimeStr = null; if (req_ts_str != null && !req_ts_str.equals("")){ SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US); SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { currTimeStr = df2.format(df.parse(req_ts_str)); currTime = df.parse(req_ts_str).getTime(); } catch (ParseException e) { System.out.println("parse req time error,using system.current time."); } } long revTs = Long.MAX_VALUE - currTime; //行健根據自己需求設計 byte[] currentRowKey = (UUIDGenerator.getUUID()+Long.toString(revTs)+ reqPath).getBytes(); List<Row> puts = new ArrayList<Row>(); Put putReq = new Put( currentRowKey); //增加列 putReq.add( colFam, "clientip".getBytes(), Bytes.toBytes(cols.getClientIp())); putReq.add( colFam, "clientindentity".getBytes(), Bytes.toBytes(cols.getClientIndentity())); putReq.add( colFam, "remoteuser".getBytes(), Bytes.toBytes(cols.getRemoteUser())); putReq.add( colFam, "httpstatuscode".getBytes(), Bytes.toBytes(cols.getHttpStatusCode())); putReq.add( colFam, "bytessent".getBytes(), Bytes.toBytes(cols.getBytesSent())); putReq.add( colFam, "request".getBytes(), Bytes.toBytes(cols.getRequest())); putReq.add( colFam, "referer".getBytes(), Bytes.toBytes(cols.getReferer())); putReq.add( colFam, "datetime".getBytes(), Bytes.toBytes(currTimeStr)); putReq.add( colFam, "useragent".getBytes(), Bytes.toBytes(cols.getUserAgent())); puts.add(putReq); return puts; } @Override public List<Increment> getIncrements() { List<Increment> incs = new ArrayList<Increment>(); return incs; } @Override public void close() { colFam = null; currentEvent = null; } } |
④UUIDGenerator.java
package com.tcloud.flume; import java.util.UUID; public class UUIDGenerator { public UUIDGenerator() { } /** * 獲得一個UUID * @return String UUID */ public static String getUUID(){ String s = UUID.randomUUID().toString(); //去掉“-”符號 return s.substring(0,8)+s.substring(9,13)+s.substring(14,18)+s.substring(19,23)+s.substring(24); } /** * 獲得指定數目的UUID * @param number int 需要獲得的UUID數量 * @return String[] UUID陣列 */ public static String[] getUUID(int number){ if(number < 1){ return null; } String[] ss = new String[number]; for(int i=0;i<number;i++){ ss[i] = getUUID(); } return ss; } } |
將上面的類匯出成jar檔案,放在flume-ng的lib目錄下
3.通過hbase的shell建立access_log表,其中列族為cf
4.配置flume-ng
<一>資料來源配置,監控日誌產生,併發送給agent
在FLUME-NG的安裝目錄的conf下建立tomcatToHbase.conf
agent.sources =baksrc agent.channels=memoryChannel agent.sinks =remotesink agent.sources.baksrc.type = exec agent.sources.baksrc.command = tail -F /home/test/data/data.txt agent.sources.baksrc.checkperiodic = 1000 agent.sources.baksrc.channels=memoryChannel agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.keep-alive = 30 agent.channels.memoryChannel.capacity = 1000 agent.channels.memoryChannel.transactionCapacity = 1000 agent.sinks.remotesink.type = avro agent.sinks.remotesink.hostname =spider-agent agent.sinks.remotesink.port = 9999 agent.sinks.remotesink.channel= memoryChannel |
<二>資料入庫hbase,接收收集的資料
在FLUME-NG的安裝目錄的conf下建立tomcatToHbase.conf
agent.sources = avrosrc agent.channels = memoryChannel agent.sinks = fileSink agent.sources.avrosrc.type = avro agent.sources.avrosrc.bind =spider-agent agent.sources.avrosrc.port =9999 agent.sources.avrosrc.channels = memoryChannel agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.keep-alive = 30 agent.channels.memoryChannel.capacity = 1000 agent.channels.memoryChannel.transactionCapacity =1000 agent.sinks.fileSink.type = hbase agent.sinks.fileSink.channel=memoryChannel agent.sinks.fileSink.table = access_log agent.sinks.fileSink.columnFamily =cf agent.sinks.fileSink.batchSize=5 agent.sinks.fileSink.serializer =com.tcloud.flume.AsyncHbaseLogEventSerializer |
5.啟動flume-ng
在master機器和node1機器上分別啟動flume服務程序:
[root@master flume]$ bin/flume-ng agent
--conf conf
--conf-file conf/tomcatToHbase.conf
--name agent
-Dflume.root.logger=INFO,console
[root@node1 flume]$ bin/flume-ng agent
--conf conf
--conf-file conf/tomcatToHbase.conf
--name agent
-Dflume.root.logger=INFO,console
相關推薦
Flume-ng+Hbase實現日誌的收集和儲存
flume ng 日誌處理並存入資料庫: flume-ng裡面的SimpleHbaseEventSerializer只提供了最簡單的資料插入hbase功能,如果還有其他需要,就得自己寫HbaseEve
Flume-NG + HDFS + HIVE 日誌收集分析
[[email protected] apache-flume-1.3.0-bin]# cat /data/apache-flume-1.3.0-bin/conf/flume.conf# Define a memory channel called c1 on a1a1.channels.c1.ty
基於Flume的美團日誌收集系統(二)改進和優化
問題導讀: 1.Flume的存在些什麼問題? 2.基於開源的Flume美團增加了哪些功能? 3.Flume系統如何調優? 在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節
基於Flume的美團日誌收集系統(一)架構和設計
美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。 《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。 第一部
10044---基於Flume的美團日誌收集系統(一)架構和設計
原文 問題導讀: 1.Flume-NG與Scribe對比,Flume-NG的優勢在什麼地方?2.架構設計考慮需要考慮什麼問題?3.Agent宕機該如何解決?4.Collector宕機是否會有影響?5.Flume-NG可靠性(reliability)方面做了哪些措施?
COPY 基於Flume的美團日誌收集系統架構和設計
美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。 《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。 第一部
SLF4J + logback 實現日誌輸出和記錄
-- .com 保持 不存在 default stat 我們 fix jar包 一、SLF4J SLF4J,即簡單日誌門面(Simple Logging Facade for Java),不是具體的日誌解決方案,它只服務於各種各樣的日誌系統。在使用SLF4J的時候,不
基於Flume+kafka打造實時日誌收集分析系統
Kafka broker修改conf/server.properties檔案,修改內容如下: broker.id=1 host.name=172.16.137.196 port=10985 log.dirs=/data/kafka
基於python實現日誌收集
指令碼: #! /usr/bin/python # encoding:utf-8 import paramiko import time import os import re import codecs import commands from time import
flume安裝配置-採集日誌到hadoop儲存
一、整體架構 flume其實就是一個日誌採集agent,在每臺應用伺服器安裝一個flume agent,然後事實採集日誌到HDFS叢集環境儲存,以便後續使用hive或者pig等大資料分析日誌,然後可轉存到mysql供運維查詢或分析使用者行為等。 叢集規劃
elasticsearch+kafka日誌收集和分析以及分散式配置(附)
<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">由於公司內部業務需求,需要將大量的請求日誌做統計分析,所以用到了elas
Flume對Nginx群集日誌收集方案
# Describe/configure the channels (後面有memory channel配置方案) a1.channels.c1.type = file a1.channels.c1.keep-alive = 10 a1.channels.c1.write-timeout = 10 a1.c
Hadoop-No.15之Flume基於事件的資料收集和處理
Flume是一種分散式的可靠開源系統,用於流資料的高效收集,聚集和移動.Flume通常用於移動日誌資料.但是也能移動大量事件資料.如社交媒體訂閱,訊息佇列事件或者網路流量資料. Flume架構
nginx+flume+hdfs搭建實時日誌收集系統
1、配置nginx.conf,新增以下配置 http { #配置日誌格式 log_format lf '$remote_addr^A$msec^A$http_host^A$reques
python中用logging實現日誌滾動和過期日誌刪除
用python中的logging庫實現日誌滾動和過期日誌刪除。 logging庫提供了兩個可以用於日誌滾動的class(可以參考https://docs.python.org/2/library/logging.handlers.html),一個是Rota
Flume+HBase採集和儲存日誌資料
轉自:http://blog.csdn.net/yaoyasong/article/details/39400829 前言 大資料時代,誰掌握了足夠的資料,誰就有可能掌握未來,而其中的資料採集就是將來的流動資產積累。 幾乎任何規模企業,每時每刻也都在產生大量的資料,但這些
大資料技術應用(一) 應用Flume+HBase採集和儲存日誌資料
前言 大資料時代,誰掌握了足夠的資料,誰就有可能掌握未來,而其中的資料採集就是將來的流動資產積累。 幾乎任何規模企業,每時每刻也都在產生大量的資料,但這些資料如何歸集、提煉始終是一個困擾。而大資料技
Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌
上一篇說了利用ExecSource從本地日誌檔案非同步的收集日誌,這篇說說採用RPC方式同步收集日誌的方式。筆者對Thrift比較熟悉,所以用ThriftSource來介紹RPC的日誌收集方式。 整體的結構圖如下: 1. ThriftSource包含了一個Thrift Server,以及一個
Flume NG原始碼分析(四)使用ExecSource從本地日誌檔案中收集日誌
常見的日誌收集方式有兩種,一種是經由本地日誌檔案做媒介,非同步地傳送到遠端日誌倉庫,一種是基於RPC方式的同步日誌收集,直接傳送到遠端日誌倉庫。這篇講講Flume NG如何從本地日誌檔案中收集日誌。 ExecSource是用來執行本地shell命令,並把本地日誌檔案中的資料封裝成Event
Flume(ng) 自定義sink實現和屬性注入
最近需要利用flume來做收集遠端日誌,所以學習一些flume最基本的用法。這裡僅作記錄。 遠端日誌收集的整體思路是遠端自定義實現log4j的appender把訊息傳送到flume端,flume端自定義實現一個sink來按照我們的規則儲存日誌。