1. 程式人生 > >flume之自定義sink元件

flume之自定義sink元件

flume內部提供了很多種sink,如:logger、file_roll、avro、hdfs、kafak、es等,方便直接將event資料對接到本地磁碟、或者其他第三方儲存中。有的時候,我們需要自定義source,來完成特殊需求。本文介紹如何開發自定義sink,來實現將event資料儲存到Mysql。

1、pom.xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.abc</groupId>
    <artifactId>ttbrain-log</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  
  <groupId>com.abc</groupId>
  <artifactId>ttbrain-log-flume</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>ttbrain-log-flume</name>
  
  <properties>
    <version.flume>1.7.0</version.flume>
  </properties>
  
  
  <dependencies>
	<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
	</dependency>
    
    <!-- flume -->
    <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
       <version>${version.flume}</version>
    </dependency>
    <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-configuration</artifactId>
       <version>${version.flume}</version>
    </dependency>
    
	<!-- mysql -->
	<dependency>
		<groupId>c3p0</groupId>
		<artifactId>c3p0</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>druid</artifactId>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
	</dependency>
  </dependencies>
  
  <profiles>
		<profile>
			<id>dev</id>
			<properties>
				<profile.env.name>dev</profile.env.name>
			</properties>
			<activation>
				<activeByDefault>true</activeByDefault>
			</activation>
		</profile>
		<profile>
			<id>test</id>
			<properties>
				<profile.env.name>test</profile.env.name>
			</properties>
		</profile>
		<profile>
			<id>product</id>
			<properties>
				<profile.env.name>product</profile.env.name>
			</properties>
		</profile>
	</profiles>
	
	<build>
    <finalName>ttbrain-log-flume-MysqlSink</finalName>
    <filters>
			<filter>${basedir}/filters/filter-${profile.env.name}.properties</filter><!--這裡指定filter屬性檔案的位置-->
	</filters>
	<resources>
		<resource>
			<directory>src/main/resources</directory>
			<filtering>true</filtering><!--這裡開啟變數替換-->
			<includes>
				<include>**/*.xml</include>
				<include>conf/*.properties</include>
				<include>**/*.properties</include>
				<include>**/*.json</include>
			</includes>
		</resource>
	</resources>
	<plugins>
			<!-- <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.abc.ttbrain.log.flume.interceptor.MyInterceptor</mainClass>
                        </manifest>
                        <manifestEntries>
                            <Class-Path>conf/</Class-Path>
                        </manifestEntries>
                    </archive>
                    <includes>
                        <include>**/*.class</include>
                    </includes>
                </configuration>
            </plugin> -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<descriptorRefs>  
						<descriptorRef>jar-with-dependencies</descriptorRef>  
					</descriptorRefs>
					<archive>
                        <manifest>
                            <mainClass>com.abc.ttbrain.log.flume.sink.MysqlSink</mainClass>
                        </manifest>
                    </archive> 
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
  </build>
</project>


2、開發自定義sink,繼承AbstractSink

package com.abc.ttbrain.log.flume.sink;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.abc.ttbrain.log.flume.sink.db.DataSourceUtils;

/**
 * MysqlSink 
 * @author kevinliu
 *
 */
public class MysqlSink extends AbstractSink implements Configurable {
	
	private static final Logger logger = LoggerFactory.getLogger(MysqlSink.class);
	
	private String tableName = "ttengine_history";
	private int batchSize = 100;
	
	private Set<String> deviceKeySet = new HashSet<>();
	private  String sql = "";
	
	public MysqlSink() {
		logger.info("MysqlSink start...");
    }
	
	@Override
    public void start() {
        super.start();
        deviceKeySet.add("0B466578-F542-4928-A66B-B12FCFA6DA23");
        deviceKeySet.add("CB13721AE4668DEA151F3F72877A2256");
        deviceKeySet.add("033C179D-8987-4D08-AF4F-94070F8632E0");
        deviceKeySet.add("0CD570CE-4B62-45AA-B86A-7828CFE4F195");
        deviceKeySet.add("D9157D45-C17E-4CF9-9C28-EE573F8CAD19");
        deviceKeySet.add("3ACE3422774FEE190304604277E389F9");
        deviceKeySet.add("2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35");
        deviceKeySet.add("A1AD2FAF-86DD-45DE-B85D-7FA23CCDE4C4");
        
        logger.info("deviceKey:"+deviceKeySet.toString());
        
        StringBuilder sb = new StringBuilder();
        sb.append("insert into ").append(tableName)
        .append(" (uid,ppuid,ch_id,f_num,cost,usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,"
        		+ "rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15"
        		+ ",p_16,p_17,p_18,p_19,p_20,p_21,p_22,p_23,p_24,p_25,p_26,p_27,p_28,p_29,p_30,p_31,p_32"
        		+ ",p_33,p_34,p_35,p_36,p_37,p_38,p_39,p_40,p_41,p_42,p_43,p_44,p_45,p_46,p_47,p_48,p_49,p_50"
        		+ ") values")
        .append(" (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,"
        		+ "?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?"
        		+ ",?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");
        
        sql = sb.toString();
    }
	
	@Override
    public void stop() {
        super.stop();
        
        //DataSourceUtils.closeDs();
    }
	
	@Override
	public Status process() throws EventDeliveryException {
		Status result = Status.READY;
        Transaction transaction = null;
        Event event = null;
        String content = "";
        List<String> actions = Lists.newArrayList();
        
        PreparedStatement preparedStatement = null;
        Connection conn = null;
        try {
        	//db
        	conn = DataSourceUtils.getConnection();
        	preparedStatement = conn.prepareStatement(sql);
        	
        	//flume
        	Channel channel = getChannel();
        	transaction = channel.getTransaction();
        	transaction.begin();
        	
        	for (int i = 0; i < batchSize; i++) {
        		event = channel.take();
            	if (event == null) {
            		result = Status.BACKOFF;
            		break;
            	} else {
            		content = new String(event.getBody(),"UTF-8");
            		String[] split = content.split("\t");
            		if (split.length < 37) {
            			continue;
            		}
            		String uid = split[0];
            		boolean contains = deviceKeySet.contains(uid);
            		if (!contains) {
            			continue;
            		}
            		//logger.info(content);
            		actions.add(content);
            	}
        	}
        	
        	if (actions.size() > 0) {
        		conn.setAutoCommit(false);
                preparedStatement.clearBatch();
                //logger.info(actions.get(0));
                for (String line : actions) {
                	String[] split = line.split("\t");
                	//uid,ppuid,ch_id,f_num,cost,
                	preparedStatement.setString(1, split[0]);//uid
                    preparedStatement.setString(2, split[1]);//ppuid
                    preparedStatement.setString(3, split[2]);//ch_id
                    preparedStatement.setInt(4, Integer.parseInt(split[3]));//f_num
                    preparedStatement.setInt(5, Integer.parseInt(split[4]));//cost
                    
                    //usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,
                    preparedStatement.setInt(6, Integer.parseInt(split[5]));//usg
                    preparedStatement.setString(7, split[6]);//prior
                    preparedStatement.setString(8, split[7]);//req_id
                    preparedStatement.setString(9, split[8]);//vers
                    preparedStatement.setString(10, split[9]);//rg
                    preparedStatement.setInt(11, Integer.parseInt(split[10]));//rh
                    preparedStatement.setString(12, split[11]);//pg
                    preparedStatement.setInt(13, Integer.parseInt(split[12]));//ph
                    preparedStatement.setString(14, split[13]);//sg
                    preparedStatement.setInt(15, Integer.parseInt(split[14]));//sh
                    String time_stamp = split[15];
                    if ("null".equals(time_stamp) || StringUtils.isBlank(time_stamp)) {
                    	time_stamp = "0";
                    }
                    preparedStatement.setLong(16, Long.parseLong(time_stamp));//time_stamp
                    preparedStatement.setString(17, split[16]);//host
                    
                    
                    //rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15
                    preparedStatement.setString(18, split[17]);//rec_feed_ids
                    preparedStatement.setString(19, split[18]);//txt
                    preparedStatement.setString(20, split[19]);//vedio
                    preparedStatement.setString(21, split[20]);//gallery
                    preparedStatement.setString(22, split[21]);//p_1
                    preparedStatement.setString(23, split[22]);//p_1
                    preparedStatement.setString(24, split[23]);//p_1
                    preparedStatement.setString(25, split[24]);//p_1
                    preparedStatement.setString(26, split[25]);//p_1
                    preparedStatement.setString(27, split[26]);//p_1
                    preparedStatement.setString(28, split[27]);//p_1
                    preparedStatement.setString(29, split[28]);//p_1
                    preparedStatement.setString(30, split[29]);//p_1
                    preparedStatement.setString(31, split[30]);//p_1
                    preparedStatement.setString(32, split[31]);//p_1
                    preparedStatement.setString(33, split[32]);//p_1
                    preparedStatement.setString(34, split[33]);//p_1
                    preparedStatement.setString(35, split[34]);//p_1
                    preparedStatement.setString(36, split[35]);//p_1
                    preparedStatement.setString(37, split[36]);
                    preparedStatement.setString(38, split[37]);
                    preparedStatement.setString(39, split[38]);
                    preparedStatement.setString(40, split[39]);
                    preparedStatement.setString(41, split[40]);
                    preparedStatement.setString(42, split[41]);
                    preparedStatement.setString(43, split[42]);
                    preparedStatement.setString(44, split[43]);
                    preparedStatement.setString(45, split[44]);
                    preparedStatement.setString(46, split[45]);
                    preparedStatement.setString(47, split[46]);
                    preparedStatement.setString(48, split[47]);
                    preparedStatement.setString(49, split[48]);
                    preparedStatement.setString(50, split[49]);
                    preparedStatement.setString(51, split[50]);
                    preparedStatement.setString(52, split[51]);
                    preparedStatement.setString(53, split[52]);
                    preparedStatement.setString(54, split[53]);
                    preparedStatement.setString(55, split[54]);
                    preparedStatement.setString(56, split[55]);
                    preparedStatement.setString(57, split[56]);
                    preparedStatement.setString(58, split[57]);
                    preparedStatement.setString(59, split[58]);
                    preparedStatement.setString(60, split[59]);
                    preparedStatement.setString(61, split[60]);
                    preparedStatement.setString(62, split[61]);
                    preparedStatement.setString(63, split[62]);
                    preparedStatement.setString(64, split[63]);
                    preparedStatement.setString(65, split[64]);
                    preparedStatement.setString(66, split[65]);
                    preparedStatement.setString(67, split[66]);
                    preparedStatement.setString(68, split[67]);
                    preparedStatement.setString(69, split[68]);
                    preparedStatement.setString(70, split[69]);
                    preparedStatement.setString(71, split[70]);
                    
                    
                    preparedStatement.addBatch();
                    
                    logger.info("device:{}",split[0]);
                }
                
                preparedStatement.executeBatch();
                conn.commit();
            }
        	
            transaction.commit();
        } catch (Throwable e) {
            try {
            	if (transaction != null) {
            		transaction.rollback();
            	}
            } catch (Exception e2) {
            	logger.error("flume transaction rollback error.", e2);
            }
            logger.error("Failed to commit flume transaction," +"Transaction rolled back.", e);
            //Throwables.propagate(e);
        } finally {
        	if (transaction != null) {
        		transaction.close();
        	}
        	
        	if (preparedStatement != null) {
        		try {
					preparedStatement.close();
				} catch (SQLException e) {
					logger.error("statement close error.", e);
				}
        	}
        	
        	if (conn != null) {
        		try {
        			conn.close();
				} catch (SQLException e) {
					logger.error("connection close error.", e);
				}
        	}
        }
        return result;
	}

	@Override
	public void configure(Context context) {
		String deviceKeys = context.getString("deviceKeys");
		if (StringUtils.isNotBlank(deviceKeys)) {
			String[] split = deviceKeys.split(",");
			for (String deviceKey : split) {
				deviceKeySet.add(deviceKey);
			}
			logger.info("sink configure deivceKeys:"+deviceKeys);
		} else {
			logger.info("sink configure deivceKeys is empty...");
		}
	}
}


說明:

1)configure方法中,可以從flume的配置檔案中讀取對應的配置資訊。當配置檔案修改後,flume框架會自動重新載入,這是就會呼叫configure方法

2)start和stop方法當flume啟動和關閉時進行執行;

3、打包:

使用 maven package 將帶把打包成ttbrain-log-flume-MysqlSink-jar-with-dependencies.jar

4、部署:

1)配置flume配置檔案:

agent1.sources = ngrinder
agent1.channels = mc2
agent1.sinks = mysql


#source
agent1.sources.ngrinder.type = exec
agent1.sources.ngrinder.command = tail -F /data/logs/ttbrain/ttbrain-recommend-api.log
agent1.sources.ngrinder.channels = mc2

#filter
agent1.sources.ngrinder.interceptors=filt1 filt2 filt3 filt4
agent1.sources.ngrinder.interceptors.filt1.type=regex_filter
agent1.sources.ngrinder.interceptors.filt1.regex=.*recId.*
agent1.sources.ngrinder.interceptors.filt2.type=host
agent1.sources.ngrinder.interceptors.filt2.hostHeader=hostname
agent1.sources.ngrinder.interceptors.filt2.useIP=true
agent1.sources.ngrinder.interceptors.filt3.type=timestamp
agent1.sources.ngrinder.interceptors.filt4.type=com.abc.ttbrain.log.flume.interceptor.MyInterceptor$Builder


#channel2
agent1.channels.mc2.type = file
agent1.channels.mc2.checkpointDir = /data/flume/ckdir/mc2_ck
agent1.channels.mc2.dataDirs = /data/flume/datadir/mc2_data

#sink2
agent1.sinks.mysql.type = com.abc.ttbrain.log.flume.sink.MysqlSink
agent1.sinks.mysql.deviceKeys = 2DCFE0C8-2DD6-4FB7-A2E6-1A210F7C7C07,3F4DA241-B827-4FF8-BB3F-624CFDEDA58D,89C574A2-1E44-468F-9AB4-96737D2FF7F2,f614ca8a42bd121a8bb971d89a078a08267b4df2,A78DB69352E74B744EDD15DE2B91BE40,2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35,B9D0A82EE8A4BB9B20CEEBB0977A9CFC,3BD40EB0-8171-4481-8B6E-002DB8C6924D,81F37B3C-5E29-40BC-A030-424E137268C2,E59909BC-D62E-4393-9D06-41BF03F81DA9
agent1.sinks.mysql.channel = mc2

說明:

A、agent1.sinks.mysql.type 指定自定義sink類全路徑;

B、agent1.sinks.mysql.deviceKeys 是自定義配置資訊,可以在自定義sink中的configure方法中獲取該資訊。

2)將ttbrain-log-flume-MysqlSink-jar-with-dependencies.jar放到flume_home 的lib目錄下;

3)啟動flume:

nohup flume-ng agent -c /usr/local/apache-flume-1.7.0-bin/conf -f /usr/local/apache-flume-1.7.0-bin/conf/engine-api-log.conf  -n agent1 >/dev/null 2>&1 &