1. 程式人生 > >Trident中使用HBase進行狀態管理

Trident中使用HBase進行狀態管理

ans args rgs idt 訂單 bat clu test pac

1.使用的類

  技術分享圖片

2.使用HBaseMapState

  技術分享圖片

3.使用狀態管理

  使用的狀態管理還要看Spout

  StateFactory factory1 = HBaseMapState.opaque(opts1);

4.服務

  需要Kafka,zookeeper

  然後Hbase,前提需要Hadoop

  技術分享圖片

5.主驅動類

 1 package com.jun.tridentWithHbase;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import
backtype.storm.StormSubmitter; 6 import backtype.storm.generated.AlreadyAliveException; 7 import backtype.storm.generated.InvalidTopologyException; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Values; 10 import org.apache.storm.hbase.trident.state.HBaseMapState; 11 import storm.trident.Stream;
12 import storm.trident.TridentState; 13 import storm.trident.TridentTopology; 14 import storm.trident.operation.builtin.Count; 15 import storm.trident.operation.builtin.Sum; 16 import storm.trident.state.OpaqueValue; 17 import storm.trident.state.StateFactory; 18 import storm.trident.testing.FixedBatchSpout;
19 import storm.trident.testing.MemoryMapState; 20 21 public class TridentDemo { 22 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 23 TridentTopology tridentTopology=new TridentTopology(); 24 //模擬數據 25 Fields field=new Fields("log","flag"); 26 FixedBatchSpout spout=new FixedBatchSpout(field,5, 27 new Values("168.214.187.214 - - [1481953616092] \"GET /view.php HTTP/1.1\" 200 0 \"http://cn.bing.com/search?q=spark mllib\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","A"), 28 new Values("168.187.202.202 - - [1481953537038] \"GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)\" \"-\"","A"), 29 new Values("61.30.167.187 - - [1481953539039] \"GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","A"), 30 new Values("30.29.132.190 - - [1481953544042] \"GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53\" \"-\"","B"), 31 new Values("222.190.187.201 - - [1481953578068] \"GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1\" \"-\"","B"), 32 new Values("72.202.43.53 - - [1481953579069] \"GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1\" 200 0 \"-\" \"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19\" \"-\"","B") 33 ); 34 //多次循環 35 spout.setCycle(true); 36 //流處理 37 Stream stream=tridentTopology.newStream("orderAnalyse",spout) 38 //過濾 39 .each(new Fields("log"),new ValidLogFilter()) 40 //解析 41 .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId")) 42 //投影 43 .project(new Fields("orderId","orderTime","orderAmtStr","memberId")) 44 //時間解析 45 .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter")) 46 ; 47 //分流 48 //1.基於minter統計訂單數量,分組統計 49 TridentState state=stream.groupBy(new Fields("minter")) 50 //全局聚合,使用內存存儲狀態信息 51 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter")); 52 // state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter()); 53 54 //2.另一個流,基於分鐘的訂單金額,局部聚合 55 Stream partitionStream=stream.each(new Fields("orderAmtStr"),new TransforAmtToDoubleFunction(),new Fields("orderAmt")) 56 .groupBy(new Fields("minter")) 57 //局部聚合 58 .chainedAgg() //聚合鏈 59 .partitionAggregate(new Fields("orderAmt"),new LocalSum(),new Fields("orderAmtSumOfLocal")) 60 .chainEnd(); //聚合鏈 61 62 //*******************************使用Hbase存儲*********************** 63 //產生Factory,而不是內存MemoryMapState.Factory() 64 HBaseMapState.Options<OpaqueValue> opts1 = new HBaseMapState.Options<>(); 65 opts1.tableName ="orderAnalyse"; //表名稱 66 opts1.columnFamily = "cf"; // 列簇名稱 67 opts1.qualifier = "totalOrderAmt"; 68 StateFactory factory1 = HBaseMapState.opaque(opts1); 69 70 //******************************************************************* 71 72 //做一次全局聚合 73 TridentState partitionState=partitionStream.groupBy(new Fields("minter")) 74 //全局聚合 75 .persistentAggregate(factory1,new Fields("orderAmtSumOfLocal"),new Sum(),new Fields("totalOrderAmt")); 76 partitionState.newValuesStream().each(new Fields("minter","totalOrderAmt"),new PrintFilter()); 77 78 //提交 79 Config config=new Config(); 80 if(args==null || args.length<=0){ 81 LocalCluster localCluster=new LocalCluster(); 82 localCluster.submitTopology("tridentDemo",config,tridentTopology.build()); 83 }else { 84 config.setNumWorkers(2); 85 StormSubmitter.submitTopology(args[0],config,tridentTopology.build()); 86 } 87 } 88 }

6.HBase中效果

  技術分享圖片

Trident中使用HBase進行狀態管理