storm_jdbc 最完整的版本,可貼上直接使用
開頭:我這裡是根據bolt與trident進行分類的,寫入和讀取的方法可能會在同一個類中,最後會展示一個測試的類來說明怎麼用。
JdbcSpout:這個類是我寫入資料和讀取資料的公用spout,細節註釋裡說的比較詳細。
import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import java.util.Random; /** * @author cwc * @date 2018年5月31日 * @description:儲存資料的spout,我的讀與寫共用的這一個spout,用於測試 * @version 1.0.0 */ public class JdbcSpout extends BaseRichSpout { public static Random random =new Random(); private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; //模擬資料 public static final List<Values> rows = Lists.newArrayList( new Values("peter",random.nextInt(80),1), new Values("bob",random.nextInt(60),2), new Values("alice",random.nextInt(100),2)); @Override public void nextTuple() { Random rand = new Random(); Values row = rows.get(rand.nextInt(rows.size() - 1)); //this.collector.emit(new Values("bob"));//用於佔位符查詢的欄位 this.collector.emit(row);//用於儲存寫入 System.out.println(row); Thread.yield(); } @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector =collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name","age","sex"));//用於儲存寫入 // declarer.declare(new Fields("name"));//用於佔位符查詢的欄位 } }
Jdbc_bolt類:注意看註釋
import java.util.List; import java.util.Objects; import org.apache.storm.jdbc.bolt.JdbcInsertBolt; import org.apache.storm.jdbc.bolt.JdbcLookupBolt; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.tuple.Fields; import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils; import parquet.org.slf4j.Logger; import parquet.org.slf4j.LoggerFactory; /** * @author cwc * @date 2018年9月28日 * @version 1.0.0 * @description:jdbc對資料庫的操作 *向jdbc中寫入資料,分別由sql寫入和全表寫入兩種bolt方式 *jdbc通過欄位與sql語句佔位符的方式查詢資料 */ public class JdbcOperationBolt { private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider(); private static Logger logger = LoggerFactory.getLogger(JdbcOperationBolt.class); /** * jdbc 根據欄位向資料庫寫入資料 * 傳入兩個引數,根據佔位符sql插入資料 * @param columnSchema 列名 * @param sqlString sql * @return */ public static JdbcInsertBolt getInsertBolt(List<Column> columnSchema,String sqlString){ if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper) .withInsertQuery(sqlString) .withQueryTimeoutSecs(30); return PersistanceBolt; } logger.error("列名或sql語句不能為空!"); return null; } /** * jdbc 根據表名向資料庫寫入資料 * 傳一個表名引數,進入全表寫入 * 注意,storm中傳入的 * @param tableName 表名 * @return */ public static JdbcInsertBolt getInsertBolt(String tableName){ if(tableName!=null||!Objects.equals(tableName, "")){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider()); JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper) .withTableName(tableName) .withQueryTimeoutSecs(30); return PersistanceBolt; } logger.error("表名不能為空!"); return null; } /** * jdbc 讀取資料 * 根據sql與列名讀取資料庫資料 * @param outputFields 宣告要輸出的欄位 * @param queryParamColumns 傳入佔位符的欄位 * @param sqlString 查詢sql * @return */ public static JdbcLookupBolt getJdbcLookupBolt(Fields outputFields,List<Column> queryParamColumns,String sqlString){ if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){ SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); JdbcLookupBolt JdbcLookupBolt = new JdbcLookupBolt(cp, sqlString, lookupMapper) .withQueryTimeoutSecs(30); return JdbcLookupBolt; } logger.error("輸出欄位,輸入欄位集合,sql查詢語句都不能為空!"); return null; } }
我將上面獲取資料庫連線的程式碼單獨貼出來,因為封裝的比較深。
/** * 獲取Jdbc需要得ConnectionProvider相關配置 * @return */ public static ConnectionProvider getConnectionProvider(){ Map<String,Object> hikariConfigMap = new HashMap<String, Object>(){{ put("dataSourceClassName", JdbcClassName); put("dataSource.url", JdbcdbUrl); put("dataSource.user", JdbcUserName); put("dataSource.password", JdbcPassWord);}}; ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); return connectionProvider; }
Jdbc_trident 類
import java.util.List; import java.util.Objects; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.jdbc.trident.state.JdbcState; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.tuple.Fields; import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils; import parquet.org.slf4j.Logger; import parquet.org.slf4j.LoggerFactory; /** * @author cwc * @date 2018年9月28日 * @version 1.0.0 * @description:jdbc Trident 類 */ public class JdbcTridentStates { private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider(); private static Logger logger = LoggerFactory.getLogger(JdbcTridentStates.class); /** * jdbc Trident 根據欄位向資料庫寫入資料 * 傳入兩個引數,根據佔位符sql插入資料 * @param columnSchema 列名 * @param sqlString sql * @return */ public static JdbcStateFactory getJdbcStateFactory (List<Column> columnSchema,String sqlString){ if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); JdbcState.Options options = new JdbcState.Options() .withConnectionProvider(cp) .withMapper(simpleJdbcMapper) .withInsertQuery(sqlString) .withQueryTimeoutSecs(200); JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options); return jdbcStateFactory; } logger.error("列名或sql為空!"); return null; } /** * jdbc Trident 根據表名向資料庫寫入資料 * 傳一個表名引數,進入全表寫入 * 注意,storm中傳入的 * @param tableName 表名 * @return */ public static JdbcStateFactory getJdbcStateFactory(String tableName){ if(tableName!=null||!Objects.equals(tableName, "")){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider()); JdbcState.Options options = new JdbcState.Options() .withConnectionProvider(cp) .withMapper(simpleJdbcMapper) .withTableName(tableName) .withQueryTimeoutSecs(200); JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options); return jdbcStateFactory; } logger.error("表名為空!"); return null; } /** * jdbc Trident 讀取資料 * @param outputFields 輸出列表 * @param queryParamColumns佔位符欄位 * @param sqlString 查詢語句 * @return */ public static JdbcStateFactory getJdbcSelectState(Fields outputFields,List<Column> queryParamColumns,String sqlString){ if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){ SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); JdbcState.Options options = new JdbcState.Options() .withConnectionProvider(cp) .withJdbcLookupMapper(lookupMapper) .withSelectQuery(sqlString) .withQueryTimeoutSecs(30); JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); return jdbcStateFactory; } logger.error("輸出欄位,輸入欄位集合,sql查詢語句都不能為空!"); return null; } }
測試類:
import java.util.List; import java.util.UUID; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.jdbc.bolt.JdbcLookupBolt; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.jdbc.trident.state.JdbcUpdater; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.sql.Types; import com.google.common.collect.Lists; import com.sunsheen.jfids.bigdata.storm.bolt.JdbcOperationBolt; import com.sunsheen.jfids.bigdata.storm.trident.JdbcTridentStates; /** * @author cwc * @date 2018年9月31日 * @description: storm整合Jdbc讀寫測試類 * @version 3.0.0 */ public class JdbcMain { private static String insterSql=" insert into jdbc_test(name,age,sex) values (?,?,?) "; private static String selectSql="select age,sex from jdbc_test where name = ?"; private static String tableName="jdbc_test"; private static Fields outputFields = new Fields("age", "sex");//就是查詢出的資料 private static List<Column> queryParamColumns = Lists.newArrayList(new Column("name", Types.VARCHAR));//佔位符的欄位 private static List<Column> columnSchema = Lists.newArrayList( new Column("name", java.sql.Types.VARCHAR), new Column("age", java.sql.Types.INTEGER), new Column("sex", java.sql.Types.INTEGER)); public static void main(String[] args){ JdbcWrite(columnSchema,insterSql,tableName); JdbcTrident(columnSchema,insterSql,tableName); JdbcRead(outputFields,queryParamColumns,selectSql); JdbcReadTrident(outputFields,queryParamColumns,selectSql); } /** * 通過jdbc的方式向資料庫寫資料 * @param connectionProvider 連線資料庫 * @param columnSchema 需要插入的列名 * @param sqlString 配合列名進行欄位插入 * @param tableName 通過表名整表插入 */ public static void JdbcWrite(List<Column> columnSchema,String sqlString,String tableName){ Config conf = new Config(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("jdbc-save", new JdbcSpout(), 2); builder.setBolt("save", JdbcOperationBolt.getInsertBolt(tableName), 1).shuffleGrouping("jdbc-save");//getInsertBolt根據引數的不同,切換欄位或全表插入的模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } cluster.shutdown(); } /** * 通過jdbc Trident的方式向資料庫寫資料 * @param connectionProvider 連線資料庫 * @param columnSchema 需要插入的列名 * @param sqlString 配合列名進行欄位插入 * @param tableName 通過表名整表插入 */ public static voidJdbcTrident(List<Column> columnSchema,String sqlString,String tableName){ TridentTopology topology = new TridentTopology(); Config config = new Config(); //JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(columnSchema, insterSql);//欄位插入 JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(tableName); Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout()); TridentState state = topology.newStaticState(jdbcStateFactory); //將資料更新插入資料庫jdbcStateFactory 根據設定的表名更新到對應的資料庫 批處理 一批一批的插入 stream.partitionPersist(jdbcStateFactory, new Fields("name", "age","sex"), new JdbcUpdater(), new Fields()); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(UUID.randomUUID().toString(), config, topology.build()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }//這個時為了防止你忘記關閉程式,造成記憶體爆炸,但是不要設定時間太小,太小程式沒跑完就終止了,要報錯。 cluster.shutdown(); } /** * 讀資料 * @param connectionProvider */ public static void JdbcRead(Fields outputFields,List<Column> queryParamColumns,String selectSql){ JdbcLookupBolt JdbcLookupBolt = JdbcOperationBolt.getJdbcLookupBolt(outputFields, queryParamColumns, selectSql); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("jdbc-reader", new JdbcSpout(), 2); builder.setBolt("read",JdbcLookupBolt, 1).shuffleGrouping("jdbc-reader"); builder.setBolt("JdbcOutBolt",new JdbcOutBolt(), 1).shuffleGrouping("read"); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(UUID.randomUUID().toString(), conf, builder.createTopology()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }//這個時為了防止你忘記關閉程式,造成記憶體爆炸,但是不要設定時間太小,太小程式沒跑完就終止了,要報錯。 cluster.shutdown(); } /** * jdbc Trident 查詢資料 * @param outputFields 要輸出傳遞的欄位,這裡的欄位是storm中隨便命名的不是資料庫欄位 * @param queryParamColumns 佔位符的欄位,也就是spout傳出過來的欄位,通過該欄位查詢資料 * @param selectSql 查詢語句,這裡sql已經把欄位名固定了,上面的欄位名都是形參用於傳輸 */ public static void JdbcReadTrident(Fields outputFields,List<Column> queryParamColumns,String selectSql){ TridentTopology topology = new TridentTopology(); JdbcStateFactory jdbcStateFactory = JdbcTridentStates.getJdbcSelectState(outputFields, queryParamColumns, selectSql); Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout()); TridentState state = topology.newStaticState(jdbcStateFactory); //stream.partitionPersist(jdbcStateFactory, outputFields, new JdbcUpdater(),outputFields);//這裡可以根據自己需要進行處理 Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(UUID.randomUUID().toString(), conf, topology.build()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }//這個時為了防止你忘記關閉程式,造成記憶體爆炸,但是不要設定時間太小,太小程式沒跑完就終止了,要報錯。 cluster.shutdown(); } }
主要內容大家看程式碼就清楚了,有問題大家可以在我部落格下留言。