flume之自定義sink元件
阿新 • • 發佈:2019-01-09
flume內部提供了很多種sink,如:logger、file_roll、avro、hdfs、kafak、es等,方便直接將event資料對接到本地磁碟、或者其他第三方儲存中。有的時候,我們需要自定義source,來完成特殊需求。本文介紹如何開發自定義sink,來實現將event資料儲存到Mysql。
1、pom.xml
<?xml version="1.0"?> <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.abc</groupId> <artifactId>ttbrain-log</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>com.abc</groupId> <artifactId>ttbrain-log-flume</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ttbrain-log-flume</name> <properties> <version.flume>1.7.0</version.flume> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> <!-- flume --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${version.flume}</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>${version.flume}</version> </dependency> <!-- mysql --> <dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> </dependencies> <profiles> <profile> <id>dev</id> <properties> <profile.env.name>dev</profile.env.name> </properties> <activation> <activeByDefault>true</activeByDefault> </activation> </profile> <profile> <id>test</id> <properties> <profile.env.name>test</profile.env.name> </properties> </profile> <profile> <id>product</id> <properties> <profile.env.name>product</profile.env.name> </properties> </profile> </profiles> <build> <finalName>ttbrain-log-flume-MysqlSink</finalName> <filters> <filter>${basedir}/filters/filter-${profile.env.name}.properties</filter><!--這裡指定filter屬性檔案的位置--> </filters> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering><!--這裡開啟變數替換--> <includes> <include>**/*.xml</include> <include>conf/*.properties</include> <include>**/*.properties</include> <include>**/*.json</include> </includes> </resource> </resources> <plugins> <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.abc.ttbrain.log.flume.interceptor.MyInterceptor</mainClass> </manifest> <manifestEntries> <Class-Path>conf/</Class-Path> </manifestEntries> </archive> <includes> <include>**/*.class</include> </includes> </configuration> </plugin> --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.abc.ttbrain.log.flume.sink.MysqlSink</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2、開發自定義sink,繼承AbstractSink
package com.abc.ttbrain.log.flume.sink; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.abc.ttbrain.log.flume.sink.db.DataSourceUtils; /** * MysqlSink * @author kevinliu * */ public class MysqlSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MysqlSink.class); private String tableName = "ttengine_history"; private int batchSize = 100; private Set<String> deviceKeySet = new HashSet<>(); private String sql = ""; public MysqlSink() { logger.info("MysqlSink start..."); } @Override public void start() { super.start(); deviceKeySet.add("0B466578-F542-4928-A66B-B12FCFA6DA23"); deviceKeySet.add("CB13721AE4668DEA151F3F72877A2256"); deviceKeySet.add("033C179D-8987-4D08-AF4F-94070F8632E0"); deviceKeySet.add("0CD570CE-4B62-45AA-B86A-7828CFE4F195"); deviceKeySet.add("D9157D45-C17E-4CF9-9C28-EE573F8CAD19"); deviceKeySet.add("3ACE3422774FEE190304604277E389F9"); deviceKeySet.add("2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35"); deviceKeySet.add("A1AD2FAF-86DD-45DE-B85D-7FA23CCDE4C4"); logger.info("deviceKey:"+deviceKeySet.toString()); StringBuilder sb = new StringBuilder(); sb.append("insert into ").append(tableName) .append(" (uid,ppuid,ch_id,f_num,cost,usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host," + "rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15" + ",p_16,p_17,p_18,p_19,p_20,p_21,p_22,p_23,p_24,p_25,p_26,p_27,p_28,p_29,p_30,p_31,p_32" + ",p_33,p_34,p_35,p_36,p_37,p_38,p_39,p_40,p_41,p_42,p_43,p_44,p_45,p_46,p_47,p_48,p_49,p_50" + ") values") .append(" (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?," + "?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?" + ",?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); sql = sb.toString(); } @Override public void stop() { super.stop(); //DataSourceUtils.closeDs(); } @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Transaction transaction = null; Event event = null; String content = ""; List<String> actions = Lists.newArrayList(); PreparedStatement preparedStatement = null; Connection conn = null; try { //db conn = DataSourceUtils.getConnection(); preparedStatement = conn.prepareStatement(sql); //flume Channel channel = getChannel(); transaction = channel.getTransaction(); transaction.begin(); for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event == null) { result = Status.BACKOFF; break; } else { content = new String(event.getBody(),"UTF-8"); String[] split = content.split("\t"); if (split.length < 37) { continue; } String uid = split[0]; boolean contains = deviceKeySet.contains(uid); if (!contains) { continue; } //logger.info(content); actions.add(content); } } if (actions.size() > 0) { conn.setAutoCommit(false); preparedStatement.clearBatch(); //logger.info(actions.get(0)); for (String line : actions) { String[] split = line.split("\t"); //uid,ppuid,ch_id,f_num,cost, preparedStatement.setString(1, split[0]);//uid preparedStatement.setString(2, split[1]);//ppuid preparedStatement.setString(3, split[2]);//ch_id preparedStatement.setInt(4, Integer.parseInt(split[3]));//f_num preparedStatement.setInt(5, Integer.parseInt(split[4]));//cost //usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host, preparedStatement.setInt(6, Integer.parseInt(split[5]));//usg preparedStatement.setString(7, split[6]);//prior preparedStatement.setString(8, split[7]);//req_id preparedStatement.setString(9, split[8]);//vers preparedStatement.setString(10, split[9]);//rg preparedStatement.setInt(11, Integer.parseInt(split[10]));//rh preparedStatement.setString(12, split[11]);//pg preparedStatement.setInt(13, Integer.parseInt(split[12]));//ph preparedStatement.setString(14, split[13]);//sg preparedStatement.setInt(15, Integer.parseInt(split[14]));//sh String time_stamp = split[15]; if ("null".equals(time_stamp) || StringUtils.isBlank(time_stamp)) { time_stamp = "0"; } preparedStatement.setLong(16, Long.parseLong(time_stamp));//time_stamp preparedStatement.setString(17, split[16]);//host //rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15 preparedStatement.setString(18, split[17]);//rec_feed_ids preparedStatement.setString(19, split[18]);//txt preparedStatement.setString(20, split[19]);//vedio preparedStatement.setString(21, split[20]);//gallery preparedStatement.setString(22, split[21]);//p_1 preparedStatement.setString(23, split[22]);//p_1 preparedStatement.setString(24, split[23]);//p_1 preparedStatement.setString(25, split[24]);//p_1 preparedStatement.setString(26, split[25]);//p_1 preparedStatement.setString(27, split[26]);//p_1 preparedStatement.setString(28, split[27]);//p_1 preparedStatement.setString(29, split[28]);//p_1 preparedStatement.setString(30, split[29]);//p_1 preparedStatement.setString(31, split[30]);//p_1 preparedStatement.setString(32, split[31]);//p_1 preparedStatement.setString(33, split[32]);//p_1 preparedStatement.setString(34, split[33]);//p_1 preparedStatement.setString(35, split[34]);//p_1 preparedStatement.setString(36, split[35]);//p_1 preparedStatement.setString(37, split[36]); preparedStatement.setString(38, split[37]); preparedStatement.setString(39, split[38]); preparedStatement.setString(40, split[39]); preparedStatement.setString(41, split[40]); preparedStatement.setString(42, split[41]); preparedStatement.setString(43, split[42]); preparedStatement.setString(44, split[43]); preparedStatement.setString(45, split[44]); preparedStatement.setString(46, split[45]); preparedStatement.setString(47, split[46]); preparedStatement.setString(48, split[47]); preparedStatement.setString(49, split[48]); preparedStatement.setString(50, split[49]); preparedStatement.setString(51, split[50]); preparedStatement.setString(52, split[51]); preparedStatement.setString(53, split[52]); preparedStatement.setString(54, split[53]); preparedStatement.setString(55, split[54]); preparedStatement.setString(56, split[55]); preparedStatement.setString(57, split[56]); preparedStatement.setString(58, split[57]); preparedStatement.setString(59, split[58]); preparedStatement.setString(60, split[59]); preparedStatement.setString(61, split[60]); preparedStatement.setString(62, split[61]); preparedStatement.setString(63, split[62]); preparedStatement.setString(64, split[63]); preparedStatement.setString(65, split[64]); preparedStatement.setString(66, split[65]); preparedStatement.setString(67, split[66]); preparedStatement.setString(68, split[67]); preparedStatement.setString(69, split[68]); preparedStatement.setString(70, split[69]); preparedStatement.setString(71, split[70]); preparedStatement.addBatch(); logger.info("device:{}",split[0]); } preparedStatement.executeBatch(); conn.commit(); } transaction.commit(); } catch (Throwable e) { try { if (transaction != null) { transaction.rollback(); } } catch (Exception e2) { logger.error("flume transaction rollback error.", e2); } logger.error("Failed to commit flume transaction," +"Transaction rolled back.", e); //Throwables.propagate(e); } finally { if (transaction != null) { transaction.close(); } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { logger.error("statement close error.", e); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { logger.error("connection close error.", e); } } } return result; } @Override public void configure(Context context) { String deviceKeys = context.getString("deviceKeys"); if (StringUtils.isNotBlank(deviceKeys)) { String[] split = deviceKeys.split(","); for (String deviceKey : split) { deviceKeySet.add(deviceKey); } logger.info("sink configure deivceKeys:"+deviceKeys); } else { logger.info("sink configure deivceKeys is empty..."); } } }
說明:
1)configure方法中,可以從flume的配置檔案中讀取對應的配置資訊。當配置檔案修改後,flume框架會自動重新載入,這是就會呼叫configure方法
2)start和stop方法當flume啟動和關閉時進行執行;
3、打包:
使用 maven package 將帶把打包成ttbrain-log-flume-MysqlSink-jar-with-dependencies.jar
4、部署:
1)配置flume配置檔案:
agent1.sources = ngrinder agent1.channels = mc2 agent1.sinks = mysql #source agent1.sources.ngrinder.type = exec agent1.sources.ngrinder.command = tail -F /data/logs/ttbrain/ttbrain-recommend-api.log agent1.sources.ngrinder.channels = mc2 #filter agent1.sources.ngrinder.interceptors=filt1 filt2 filt3 filt4 agent1.sources.ngrinder.interceptors.filt1.type=regex_filter agent1.sources.ngrinder.interceptors.filt1.regex=.*recId.* agent1.sources.ngrinder.interceptors.filt2.type=host agent1.sources.ngrinder.interceptors.filt2.hostHeader=hostname agent1.sources.ngrinder.interceptors.filt2.useIP=true agent1.sources.ngrinder.interceptors.filt3.type=timestamp agent1.sources.ngrinder.interceptors.filt4.type=com.abc.ttbrain.log.flume.interceptor.MyInterceptor$Builder #channel2 agent1.channels.mc2.type = file agent1.channels.mc2.checkpointDir = /data/flume/ckdir/mc2_ck agent1.channels.mc2.dataDirs = /data/flume/datadir/mc2_data #sink2 agent1.sinks.mysql.type = com.abc.ttbrain.log.flume.sink.MysqlSink agent1.sinks.mysql.deviceKeys = 2DCFE0C8-2DD6-4FB7-A2E6-1A210F7C7C07,3F4DA241-B827-4FF8-BB3F-624CFDEDA58D,89C574A2-1E44-468F-9AB4-96737D2FF7F2,f614ca8a42bd121a8bb971d89a078a08267b4df2,A78DB69352E74B744EDD15DE2B91BE40,2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35,B9D0A82EE8A4BB9B20CEEBB0977A9CFC,3BD40EB0-8171-4481-8B6E-002DB8C6924D,81F37B3C-5E29-40BC-A030-424E137268C2,E59909BC-D62E-4393-9D06-41BF03F81DA9 agent1.sinks.mysql.channel = mc2
說明:
A、agent1.sinks.mysql.type 指定自定義sink類全路徑;
B、agent1.sinks.mysql.deviceKeys 是自定義配置資訊,可以在自定義sink中的configure方法中獲取該資訊。
2)將ttbrain-log-flume-MysqlSink-jar-with-dependencies.jar放到flume_home 的lib目錄下;
3)啟動flume:
nohup flume-ng agent -c /usr/local/apache-flume-1.7.0-bin/conf -f /usr/local/apache-flume-1.7.0-bin/conf/engine-api-log.conf -n agent1 >/dev/null 2>&1 &