1. 程式人生 > >Flume自定義Sink到MySQL資料庫

Flume自定義Sink到MySQL資料庫

package com.yimen.data.flume.sink;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.sql.*;
import java.util.List;
import java.util.Map;
import org.apache.flume.*;
import
org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MysqlSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(MysqlSink.class); private String hostname;
private String port; private String databaseName; private String tableName; private String username; private String password; private String testtype; private PreparedStatement preparedStatement; private Connection conn; private int batchSize; //每次提交的批次大小 public
static final String TEST_TYPE = "testType"; public MysqlSink() { LOG.info("MysqlSink start..."); } /**實現Configurable介面中的方法:可獲取配置檔案中的屬性*/ public void configure(Context context) { hostname = context.getString("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getString("port"); Preconditions.checkNotNull(port, "port must be set!!"); databaseName = context.getString("databaseName"); Preconditions.checkNotNull(databaseName, "databaseName must be set!!"); tableName = context.getString("tableName"); Preconditions.checkNotNull(tableName, "tableName must be set!!"); username = context.getString("username"); Preconditions.checkNotNull(username, "user must be set!!"); password = context.getString("password"); Preconditions.checkNotNull(password, "password must be set!!"); testtype = context.getString("testtype");//獲取指定型別過濾 Preconditions.checkNotNull(password, "password must be set!!"); batchSize = context.getInteger("batchSize", 100); Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!"); } @Override public void start() { super.start(); try { //呼叫Class.forName()方法載入驅動程式 Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName; //呼叫DriverManager物件的getConnection()方法,獲得一個Connection物件 try { conn = DriverManager.getConnection(url, username, password); conn.setAutoCommit(false); //建立一個Statement物件 preparedStatement = conn.prepareStatement("insert into " + tableName + " (name,age,time) values (?,?,?)"); } catch (SQLException e) { e.printStackTrace(); System.exit(1); } } @Override public void stop() { super.stop(); if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; String content = ""; String appType = ""; transaction.begin(); List<PerBean> lists = Lists.newArrayList(); try { for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { content = new String(event.getBody()); testType = event.getHeaders().get(TEST_TYPE); if(testtype .equals(testType )){ if(!"".equals(content )){ PerBeanbean = JSON.parseObject(content, PerBean.class); lists.add(bean); } } } else { result = Status.BACKOFF; break; } } if (lists != null && lists.size() > 0) { preparedStatement.clearBatch(); for (PerBean bean : lists) { preparedStatement.setObject(1, bean.getName()); //用setObject避免NullPointException preparedStatement.setObject(2, bean.getAge()); preparedStatement.setTimestamp(4, new Timestamp(bean.getRp_date().getTime()));//java.util.date 轉 java.sql.date preparedStatement.addBatch(); } preparedStatement.executeBatch(); conn.commit(); } transaction.commit(); } catch (Throwable e) { try { transaction.rollback(); } catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); } LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } finally { transaction.close(); } return result; } }