1. 程式人生 > >flume使用(二):採集遠端日誌資料到MySql資料庫

flume使用(二):採集遠端日誌資料到MySql資料庫

本文內容可檢視目錄

本文內容包含單節點(單agent)和多節點(多agent,採集遠端日誌)說明

一、環境

linux系統:Centos7
Jdk:1.7

Flume:1.7.0

二、安裝

linux中jdk、mysql的安裝不多贅述

flume1.7的安裝:進入官網:http://flume.apache.org/

然後找到1.7版本下載放到centos系統解壓即可

三、準備資料庫表

注,本文flume的event是execSource來源。即通過執行linux命令獲得執行結果作為flume的資料來源。通過自定義MysqlSink作為flume的sink。

建立sql語句:

CREATE TABLE `flume_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

四、MysqlSink編寫

4.1.maven建立專案(打包方式為jar)

pom.xml檔案:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>yichao.mym</groupId>
  <artifactId>flumeDemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  
     <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.7.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.15</version>
		</dependency>
    </dependencies>
</project>

4.2 準備java Bean

與資料庫表對應的javabean,方便處理event的body(event的body就是execSource的命令讀取的內容)

package yichao.mym.base.bean;

public class Person {

	private String name;
	
	private Integer age;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public Integer getAge() {
		return age;
	}

	public void setAge(Integer age) {
		this.age = age;
	}
	
}

4.3 自定義的sink編寫

說明都在程式碼中

package yichao.mym.base.bean;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;

public class MysqlSink extends AbstractSink implements Configurable {

    private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
    
    private String hostname;
    private String port;
    private String databaseName;
    private String tableName;
    private String user;
    private String password;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private int batchSize;		//每次提交的批次大小

    public MysqlSink() {
        LOG.info("MySqlSink start...");
    }

    /**實現Configurable介面中的方法:可獲取配置檔案中的屬性*/
    public void configure(Context context) {
        hostname = context.getString("hostname");
        Preconditions.checkNotNull(hostname, "hostname must be set!!");
        port = context.getString("port");
        Preconditions.checkNotNull(port, "port must be set!!");
        databaseName = context.getString("databaseName");
        Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
        tableName = context.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
        user = context.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = context.getString("password");
        Preconditions.checkNotNull(password, "password must be set!!");
        batchSize = context.getInteger("batchSize", 100);		//設定了batchSize的預設值
        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
    }

    /**
     * 服務啟動時執行的程式碼,這裡做準備工作
     */
    @Override
    public void start() {
        super.start();
        try {
            //呼叫Class.forName()方法載入驅動程式
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

        String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;
        //呼叫DriverManager物件的getConnection()方法,獲得一個Connection物件

        try {
            conn = DriverManager.getConnection(url, user, password);
            conn.setAutoCommit(false);
            //建立一個Statement物件
            preparedStatement = conn.prepareStatement("insert into " + tableName +
                    " (name,age) values (?,?)");

        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    /**
     * 服務關閉時執行
     */
    @Override
    public void stop() {
        super.stop();
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     *  執行的事情:<br/>
          	(1)持續不斷的從channel中獲取event放到batchSize大小的陣列中<br/>
          	(2)event可以獲取到則進行event處理,否則返回Status.BACKOFF標識沒有資料提交<br/>
          	(3)batchSize中有內容則進行jdbc提交<br/>
     */
    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;

        List<Person> persons = Lists.newArrayList();
        transaction.begin();
       
        
        try {
        	/*event處理*/
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();
                if (event != null) {//對事件進行處理
                    //event 的 body 為   "exec tail-event-$i , $i"
                    content = new String(event.getBody());
                    Person person=new Person();
                    if (content.contains(",")) {
                        //儲存 event 的 content
                    	person.setName(content.substring(0, content.indexOf(",")));
                        //儲存 event 的 create  +1 是要減去那個 ","
                    	person.setAge(Integer.parseInt(content.substring(content.indexOf(",")+1).trim()));
                    }else{
                    	person.setName(content);
                    }
                    persons.add(person);
                } else {
                    result = Status.BACKOFF;
                    break;
                }
            }

            /*jdbc提交*/
            if (persons.size() > 0) {
                preparedStatement.clearBatch();
                for (Person temp : persons) {
                    preparedStatement.setString(1, temp.getName());
                    preparedStatement.setInt(2, temp.getAge());
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                conn.commit();
            }
            transaction.commit();
        } catch (Exception e) {
            try {
                transaction.rollback();
            } catch (Exception e2) {
                LOG.error("Exception in rollback. Rollback might not have been.successful.", e2);
            }
            LOG.error("Failed to commit transaction.Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }
        return result;
    }
}
編寫好後打包成jar,傳送到flume安裝目錄下的lib資料夾中。同時把mysql的驅動包mysql-connector-java一起放過去

4.4 conf配置:編寫mysqlSink.conf(單agent的測試)

在flume的conf 資料夾下新建配置檔案 mysqlSink.conf 內容如下:

agent1.sources=source1
agent1.channels=channel1
agent1.sinks=mysqlSink

# describe/configure source1
# type:exec is through linux command like 'tail -F' to collect logData
agent1.sources.source1.type=exec
agent1.sources.source1.command=tail -F /usr/local/tomcat/logs/ac.log
agent1.sources.source1.channels=channel1

# use a channel which buffers events in memory
# type:memory or file is to temporary to save buffer data which is sink using
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=5000
agent1.channels.channel1.transactionCapacity=1000

# describe sink. there are using mysqlSink that is a jar
agent1.sinks.mysqlSink.type=yichao.mym.base.bean.MysqlSink
agent1.sinks.mysqlSink.hostname=localhost
agent1.sinks.mysqlSink.port=3306
agent1.sinks.mysqlSink.databaseName=firstflume
agent1.sinks.mysqlSink.tableName=flume_test
agent1.sinks.mysqlSink.user=root
agent1.sinks.mysqlSink.password=123456
agent1.sinks.mysqlSink.channel=channel1
agent1.sinks.mysqlSink.batchSize=5

說明:

(1)localhost 為mysql 資料庫所在的伺服器IP;

(2)/usr/local/tomcat/logs/ac.log;

(3)yichao.mym.base.bean.MysqlSink是自定義sink的mysqlsink的全稱

重點:capacity(channel大小) > transactionCapacity(大小是每次flume的事務大小) > batchSize(sink會一次從channel中取多少個event去傳送)。

這些數值應根據實時性要求、併發量、佔用系統資源等方面權衡設計,但必須遵循以上標準。flume官方卻沒有這樣的說明,一旦沒有遵循,執行過程中就會報錯!

五、準備測試

啟動flume:在flume安裝目錄下的bin目錄中:

./flume-ng agent -c ../conf -f ../conf/mysqlSink.conf -n agent1 -Dflume.root.logger=INFO,console

啟動服務後,可以模擬log檔案的動態增長,新開終端,通過shell命令:

for i in {1..100};do echo "exec tail-name-$i,$i" >> /usr/local/tomcat/logs/ac.log;sleep 1;done;

此時可以快速重新整理資料庫的資料表,可以看到資料正在動態增長:


-----------------------------------------------------------------------------------------------------

六、多節點多agent

1.說明架構方式

兩臺可互相通訊的linux機器:

201機器:安裝好jdk1.7,mysql,flume1.7

202機器:安裝好jdk1.7,flume1.7

結構:

不過本案例中,agent1、agent2、agent3都是execSource源,即直接讀取磁碟上的log檔案,而不是log4j直接作為agent的source。

那麼對於本案例,202機器就作為其中一個agent收集者(agent1、agent2、agent3),把從本機上收集的log內容傳送到遠端的201機器。他們之間就是使用avro作為傳輸協議。

所以本案例202機器的:

source:exec (tail -F /usr/local/tomcat/logs/ac.log)

channel:memory

sink:avro

本案例201機器的:

source:avro

channel:memory

sink:自定義的mysqlSink

注:表、自定義的sink的jar、javaBean都和之前的一致

2.兩個agent的配置檔案conf

202機器的flume配置檔案:tail-avro.conf

agent1.sources=source1  
agent1.channels=channel1  
agent1.sinks=mysqlSink  
  
# describe/configure source1  
# type:exec is through linux command like 'tail -F' to collect logData  
agent1.sources.source1.type=exec  
agent1.sources.source1.command=tail -F /usr/local/tomcat/logs/ac.log  
agent1.sources.source1.channels=channel1  
  
# use a channel which buffers events in memory  
# type:memory or file is to temporary to save buffer data which is sink using  
agent1.channels.channel1.type=memory  
agent1.channels.channel1.capacity=5000  
agent1.channels.channel1.transactionCapacity=1000  

agent1.sinks.mysqlSink.type=avro
agent1.sinks.mysqlSink.channel=channel1
agent1.sinks.mysqlSink.hostname=192.168.216.201
agent1.sinks.mysqlSink.port=4545
agent1.sinks.mysqlSink.batch-size=5

201機器的flume配置檔案:avro-mysql.conf

agent1.sources=source1  
agent1.channels=channel1  
agent1.sinks=mysqlSink  
  
# describe/configure source1  
# type:avro is through net-protocal-transport to collect logData  
agent1.sources.source1.type = avro
agent1.sources.source1.channels = channel1
agent1.sources.source1.bind = 192.168.216.201
agent1.sources.source1.port = 4545
  
# use a channel which buffers events in memory  
# type:memory or file is to temporary to save buffer data which is sink using  
agent1.channels.channel1.type=memory  
agent1.channels.channel1.capacity=5000  
agent1.channels.channel1.transactionCapacity=1000  

# describe sink. there are using mysqlSink that is a jar  
agent1.sinks.mysqlSink.type=yichao.mym.base.bean.MysqlSink  
agent1.sinks.mysqlSink.hostname=localhost  
agent1.sinks.mysqlSink.port=3306  
agent1.sinks.mysqlSink.databaseName=firstflume  
agent1.sinks.mysqlSink.tableName=flume_test  
agent1.sinks.mysqlSink.user=root  
agent1.sinks.mysqlSink.password=123456  
agent1.sinks.mysqlSink.channel=channel1  
agent1.sinks.mysqlSink.batchSize=5  

分別配置好並且啟動服務。(可先啟動機器201,因為機器202需要連線機器201)

3.啟動測試

機器 201 的flume啟動命令:在flume目錄下的bin目錄中執行

./flume-ng agent -c ../conf -f ../conf/avro-mysql.conf -n agent1 -Dflume.root.logger=INFO,console

機器 202 的flume啟動命令:在flume目錄下的bin目錄中執行

./flume-ng agent -c ../conf -f ../conf/tail-avro.conf -n agent1 -Dflume.root.logger=INFO,console

啟動完之後在機器202上進行模擬log檔案資料動態生成:

for i in {1..150};do echo "exec tail-name-$i,$i" >> /usr/local/tomcat/logs/ac.log;sleep 1;done;

此時可以檢視機器201上的資料庫表的資料是否有動態新增:

至此多節點agent的測試完成!