1. 程式人生 > >Flume-ng+Hbase實現日誌的收集和儲存

Flume-ng+Hbase實現日誌的收集和儲存

flume ng 日誌處理並存入資料庫:
flume-ng裡面的SimpleHbaseEventSerializer只提供了最簡單的資料插入hbase功能,如果還有其他需要,就得自己寫HbaseEventSerializer類,實現flume中的HbaseEventSerializer介面。一個簡單的例項如下:

1.前提條件

Hadoop+HBase+Zookeeper+Flume-ng

2.解析日誌程式

AccessLog.java

package com.tcloud.flume;

public class AccessLog {

private String clientIp;

    private

 String clientIndentity;

    private String remoteUser;

    private String dateTime;

    private String request;

    private String httpStatusCode;

    private String bytesSent;

    private String referer;

    private String userAgent;

public String getClientIp() {

return clientIp;

}

public void setClientIp(String clientIp

) {

this.clientIp = clientIp;

}

public String getClientIndentity() {

return clientIndentity;

}

public void setClientIndentity(String clientIndentity) {

this.clientIndentity = clientIndentity;

}

public String getRemoteUser() {

return remoteUser;

}

public void setRemoteUser(String remoteUser) {

this.remoteUser

 = remoteUser;

}

public String getDateTime() {

return dateTime;

}

public void setDateTime(String dateTime) {

this.dateTime = dateTime;

}

public String getRequest() {

return request;

}

public void setRequest(String request) {

this.request = request;

}

public String getHttpStatusCode() {

return httpStatusCode;

}

public void setHttpStatusCode(String httpStatusCode) {

this.httpStatusCode = httpStatusCode;

}

public String getBytesSent() {

return bytesSent;

}

public void setBytesSent(String bytesSent) {

this.bytesSent = bytesSent;

}

public String getReferer() {

return referer;

}

public void setReferer(String referer) {

this.referer = referer;

}

public String getUserAgent() {

return userAgent;

}

public void setUserAgent(String userAgent) {

this.userAgent = userAgent;

}

}

AccessLogParser.java

package com.tcloud.flume;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

public class AccessLogParser {

/**

 * 日誌格式

 * 11.52.10.49 - - [17/Sep/2015:11:35:21 +0800] "GET /webapp HTTP/1.1" 302 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36"

 */

    private static String pattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\""; 

    private static Pattern p = Pattern.compile(pattern);

    public static AccessLog parse(String line){

 Matchermatcher = p.matcher(line);

if (matcher.matches()){

 AccessLog accessLog = new AccessLog();

 accessLog.setClientIp(matcher.group(1));

 accessLog.setClientIndentity(matcher.group(2));

 accessLog.setRemoteUser(matcher.group(3));

 accessLog.setDateTime(matcher.group(4));

 accessLog.setRequest(matcher.group(5));

 accessLog.setHttpStatusCode(matcher.group(6));

 accessLog.setBytesSent(matcher.group(7));

 accessLog.setReferer(matcher.group(8));

 accessLog.setUserAgent(matcher.group(9));

 return accessLog;

}

return null;

}

}

AsyncHbaseLogEventSerializer.java

package com.tcloud.flume;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.List;

import java.util.Locale;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.conf.ComponentConfiguration;

import org.apache.flume.sink.hbase.HbaseEventSerializer;

import org.apache.hadoop.hbase.client.Increment;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Row;

import org.apache.hadoop.hbase.util.Bytes;

public class AsyncHbaseLogEventSerializer  implements HbaseEventSerializer {

//列族

private byte[] colFam="cf".getBytes();

private Event currentEvent;

@Override

public void initialize(Event eventbyte[] colFam) {

     this.currentEvent = event;

         this.colFam = colFam;

}

@Override

public void configure(Context context) {}

@Override

public void configure(ComponentConfiguration conf) {

}

@Override

public List<Row> getActions() {

         // Split the event body and get the values for the columns

         String eventStr = new String(currentEvent.getBody());

         AccessLog cols = AccessLogParser.parse(eventStr);

         String req = cols.getRequest();

         String reqPath = req.split(" ")[1];

         int pos = reqPath.indexOf("?");

  if (pos > 0) {

        reqPath = reqPath.substring(0,pos);

  }       

  if(reqPath.length() > 1 && reqPath.trim().endsWith("/")){

        reqPath = reqPath.substring(0,reqPath.length()-1);

  }       

  String req_ts_str = cols.getDateTime();

  Long currTime = System.currentTimeMillis();

  String currTimeStr = null;

  if (req_ts_str != null && !req_ts_str.equals("")){

                   SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);

                   SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

try {

                            currTimeStr = df2.format(df.parse(req_ts_str));

                            currTime = df.parse(req_ts_str).getTime();

                   } catch (ParseException e) {

                            System.out.println("parse req time error,using system.current time.");

                   }

         }

   long revTs = Long.MAX_VALUE - currTime;

   //行健根據自己需求設計

   byte[] currentRowKey = (UUIDGenerator.getUUID()+Long.toString(revTs)+ reqPath).getBytes();

   List<Row> puts = new ArrayList<Row>();

   Put putReq = new Put( currentRowKey);

         //增加列

    putReq.add( colFam,  "clientip".getBytes(), Bytes.toBytes(cols.getClientIp()));

    putReq.add( colFam,  "clientindentity".getBytes(), Bytes.toBytes(cols.getClientIndentity()));

    putReq.add( colFam,  "remoteuser".getBytes(), Bytes.toBytes(cols.getRemoteUser()));

    putReq.add( colFam,  "httpstatuscode".getBytes(), Bytes.toBytes(cols.getHttpStatusCode()));

    putReq.add( colFam,  "bytessent".getBytes(), Bytes.toBytes(cols.getBytesSent()));

    putReq.add( colFam,  "request".getBytes(), Bytes.toBytes(cols.getRequest()));

         putReq.add( colFam,  "referer".getBytes(), Bytes.toBytes(cols.getReferer()));

         putReq.add( colFam,  "datetime".getBytes(), Bytes.toBytes(currTimeStr));

         putReq.add( colFam,  "useragent".getBytes(), Bytes.toBytes(cols.getUserAgent()));

         puts.add(putReq);              

         return puts;

 } 

@Override

 public List<Increment> getIncrements() {

List<Increment> incs = new ArrayList<Increment>();

return incs;

}

@Override

public void close() {

         colFam = null;

         currentEvent = null;

}

}

UUIDGenerator.java

package com.tcloud.flume;

import java.util.UUID;

public class UUIDGenerator {

    public UUIDGenerator() { 

    } 

    /** 

     * 獲得一個UUID 

     * @return String UUID 

     */ 

    public static String getUUID(){ 

        String s = UUID.randomUUID().toString(); 

        //去掉“-”符號 

        return s.substring(0,8)+s.substring(9,13)+s.substring(14,18)+s.substring(19,23)+s.substring(24); 

    } 

    /** 

     * 獲得指定數目的UUID 

     * @param number int 需要獲得的UUID數量 

     * @return String[] UUID陣列 

     */ 

    public static String[] getUUID(int number){ 

        if(number < 1){ 

            return null

        } 

        String[] ss = new String[number]; 

        for(int i=0;i<number;i++){ 

            ss[i] = getUUID(); 

        } 

        return ss

    } }

將上面的類匯出成jar檔案,放在flume-nglib目錄下

3.通過hbaseshell建立access_log表,其中列族為cf

4.配置flume-ng

  <>資料來源配置,監控日誌產生,併發送給agent

      在FLUME-NG的安裝目錄的conf下建立tomcatToHbase.conf

agent.sources =baksrc

agent.channels=memoryChannel

agent.sinks =remotesink

agent.sources.baksrc.type = exec

agent.sources.baksrc.command = tail -F /home/test/data/data.txt

agent.sources.baksrc.checkperiodic = 1000

agent.sources.baksrc.channels=memoryChannel

agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.keep-alive = 30

agent.channels.memoryChannel.capacity = 1000

agent.channels.memoryChannel.transactionCapacity = 1000

agent.sinks.remotesink.type = avro

agent.sinks.remotesink.hostname =spider-agent

agent.sinks.remotesink.port = 9999

agent.sinks.remotesink.channel= memoryChannel

<>資料入庫hbase,接收收集的資料

  在FLUME-NG的安裝目錄的conf下建立tomcatToHbase.conf

agent.sources = avrosrc

agent.channels = memoryChannel

agent.sinks = fileSink

agent.sources.avrosrc.type = avro

agent.sources.avrosrc.bind =spider-agent

agent.sources.avrosrc.port =9999

agent.sources.avrosrc.channels = memoryChannel

agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.keep-alive = 30

agent.channels.memoryChannel.capacity = 1000

agent.channels.memoryChannel.transactionCapacity =1000

agent.sinks.fileSink.type = hbase

agent.sinks.fileSink.channel=memoryChannel

agent.sinks.fileSink.table = access_log

agent.sinks.fileSink.columnFamily =cf

agent.sinks.fileSink.batchSize=5

agent.sinks.fileSink.serializer =com.tcloud.flume.AsyncHbaseLogEventSerializer

5.啟動flume-ng

在master機器和node1機器上分別啟動flume服務程序:

[root@master flume]$ bin/flume-ng agent 

                                  --conf conf 

                                  --conf-file conf/tomcatToHbase.conf 

                                  --name agent 

                                  -Dflume.root.logger=INFO,console

[root@node1 flume]$ bin/flume-ng agent 

                                  --conf conf 

                                  --conf-file conf/tomcatToHbase.conf 

                                  --name agent 

                                  -Dflume.root.logger=INFO,console

相關推薦

Flume-ng+Hbase實現日誌收集儲存

flume ng 日誌處理並存入資料庫: flume-ng裡面的SimpleHbaseEventSerializer只提供了最簡單的資料插入hbase功能,如果還有其他需要,就得自己寫HbaseEve

Flume-NG + HDFS + HIVE 日誌收集分析

[[email protected] apache-flume-1.3.0-bin]# cat /data/apache-flume-1.3.0-bin/conf/flume.conf# Define a memory channel called c1 on a1a1.channels.c1.ty

基於Flume的美團日誌收集系統(二)改進優化

問題導讀: 1.Flume的存在些什麼問題? 2.基於開源的Flume美團增加了哪些功能? 3.Flume系統如何調優? 在《基於Flume的美團日誌收集系統(一)架構和設計》中,我們詳述了基於Flume的美團日誌收集系統的架構設計,以及為什麼做這樣的設計。在本節

基於Flume的美團日誌收集系統(一)架構設計

美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。 《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。 第一部

10044---基於Flume的美團日誌收集系統(一)架構設計

原文 問題導讀: 1.Flume-NG與Scribe對比,Flume-NG的優勢在什麼地方?2.架構設計考慮需要考慮什麼問題?3.Agent宕機該如何解決?4.Collector宕機是否會有影響?5.Flume-NG可靠性(reliability)方面做了哪些措施?  

COPY 基於Flume的美團日誌收集系統架構設計

美團的日誌收集系統負責美團的所有業務日誌的收集,並分別給Hadoop平臺提供離線資料和Storm平臺提供實時資料流。美團的日誌收集系統基於Flume設計和搭建而成。 《基於Flume的美團日誌收集系統》將分兩部分給讀者呈現美團日誌收集系統的架構設計和實戰經驗。 第一部

SLF4J + logback 實現日誌輸出記錄

-- .com 保持 不存在 default stat 我們 fix jar包 一、SLF4J   SLF4J,即簡單日誌門面(Simple Logging Facade for Java),不是具體的日誌解決方案,它只服務於各種各樣的日誌系統。在使用SLF4J的時候,不

基於Flume+kafka打造實時日誌收集分析系統

Kafka broker修改conf/server.properties檔案,修改內容如下:           broker.id=1           host.name=172.16.137.196 port=10985           log.dirs=/data/kafka

基於python實現日誌收集

指令碼: #! /usr/bin/python # encoding:utf-8 import paramiko import time import os import re import codecs import commands from time import

flume安裝配置-採集日誌到hadoop儲存

一、整體架構        flume其實就是一個日誌採集agent,在每臺應用伺服器安裝一個flume agent,然後事實採集日誌到HDFS叢集環境儲存,以便後續使用hive或者pig等大資料分析日誌,然後可轉存到mysql供運維查詢或分析使用者行為等。 叢集規劃

elasticsearch+kafka日誌收集分析以及分散式配置(附)

<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">由於公司內部業務需求,需要將大量的請求日誌做統計分析,所以用到了elas

Flume對Nginx群集日誌收集方案

# Describe/configure the channels (後面有memory channel配置方案) a1.channels.c1.type = file a1.channels.c1.keep-alive = 10 a1.channels.c1.write-timeout = 10 a1.c

Hadoop-No.15之Flume基於事件的資料收集處理

Flume是一種分散式的可靠開源系統,用於流資料的高效收集,聚集和移動.Flume通常用於移動日誌資料.但是也能移動大量事件資料.如社交媒體訂閱,訊息佇列事件或者網路流量資料. Flume架構

nginx+flume+hdfs搭建實時日誌收集系統

1、配置nginx.conf,新增以下配置 http { #配置日誌格式 log_format lf '$remote_addr^A$msec^A$http_host^A$reques

python中用logging實現日誌滾動過期日誌刪除

用python中的logging庫實現日誌滾動和過期日誌刪除。 logging庫提供了兩個可以用於日誌滾動的class(可以參考https://docs.python.org/2/library/logging.handlers.html),一個是Rota

Flume+HBase採集儲存日誌資料

轉自:http://blog.csdn.net/yaoyasong/article/details/39400829 前言 大資料時代,誰掌握了足夠的資料,誰就有可能掌握未來,而其中的資料採集就是將來的流動資產積累。 幾乎任何規模企業,每時每刻也都在產生大量的資料,但這些

大資料技術應用(一) 應用Flume+HBase採集儲存日誌資料

前言 大資料時代,誰掌握了足夠的資料,誰就有可能掌握未來,而其中的資料採集就是將來的流動資產積累。 幾乎任何規模企業,每時每刻也都在產生大量的資料,但這些資料如何歸集、提煉始終是一個困擾。而大資料技

Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌

上一篇說了利用ExecSource從本地日誌檔案非同步的收集日誌,這篇說說採用RPC方式同步收集日誌的方式。筆者對Thrift比較熟悉,所以用ThriftSource來介紹RPC的日誌收集方式。 整體的結構圖如下: 1. ThriftSource包含了一個Thrift Server,以及一個

Flume NG原始碼分析(四)使用ExecSource從本地日誌檔案中收集日誌

常見的日誌收集方式有兩種,一種是經由本地日誌檔案做媒介,非同步地傳送到遠端日誌倉庫,一種是基於RPC方式的同步日誌收集,直接傳送到遠端日誌倉庫。這篇講講Flume NG如何從本地日誌檔案中收集日誌。 ExecSource是用來執行本地shell命令,並把本地日誌檔案中的資料封裝成Event

Flume(ng) 自定義sink實現屬性注入

最近需要利用flume來做收集遠端日誌,所以學習一些flume最基本的用法。這裡僅作記錄。 遠端日誌收集的整體思路是遠端自定義實現log4j的appender把訊息傳送到flume端,flume端自定義實現一個sink來按照我們的規則儲存日誌。