大資料入門(22)storm的第一個例項
阿新 • • 發佈:2018-12-10
public class RandomWordSpout extends BaseRichSpout{ private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; //模擬一些資料 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; @Override public void nextTuple() { //words陣列中隨機挑選一個商品名傳送出去 Random r = new Random(); int i = r.nextInt(words.length); //通過隨機數拿到一個商品名 String value = words[i]; //將商品名封裝成tuple,傳送訊息給下一個元件 this.collector.emit(new Values(value)); //每傳送一個訊息,休眠500ms Utils.sleep(500); } //初始化方法,在spout元件例項化時呼叫一次 @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { // TODO Auto-generated method stub this.collector = collector; } //宣告本spout元件傳送出去的tuple中的資料的欄位名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("orignname")); } } ########################################################################## public class UpperBolt extends BaseBasicBolt{ private static final long serialVersionUID = 1L; @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub String value = tuple.getString(0); String value_upper = value.toUpperCase(); collector.emit(new Values(value_upper)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("upperName")); } } ############################################################################## public class SuffixBolt extends BaseBasicBolt{ private static final long serialVersionUID = 1L; FileWriter fileWriter = null; //在bolt元件執行過程中只會被呼叫一次 @Override public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub try { fileWriter = new FileWriter("/home/admin/storm_output/"+UUID.randomUUID()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //該bolt元件的核心處理邏輯 //每收到一個tuple訊息,就會被呼叫一次 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub //先拿到上一個元件傳送過來的商品名稱 String upper_name =tuple.getString(0); //為上一個元件傳送過來的商品名稱新增字尾 String suffix_name = upper_name + "2018-11-06"; try { fileWriter.write(suffix_name); fileWriter.write("\n"); fileWriter.flush(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //本bolt已經不需要傳送tuple訊息到下一個元件,所以不需要再宣告tuple的欄位 @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } } ######################################################################## public class TopologyMain { public static void main(String[] args) throws Exception{ TopologyBuilder builder = new TopologyBuilder(); //將我們的spout元件設定到topology中去 //parallelism_hint :4 表示用4個excutor來執行這個元件 //setNumTasks(8) 設定的是該元件執行時的併發task數量,也就意味著1個excutor會執行2個task builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8); //將大寫轉換bolt元件設定到topology,並且指定它接收randomspout元件的訊息 //.shuffleGrouping("randomspout")包含兩層含義: //1、upperbolt元件接收的tuple訊息一定來自於randomspout元件 //2、randomspout元件和upperbolt元件的大量併發task例項之間收發訊息時採用的分組策略是隨機分組shuffleGrouping builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); //將新增字尾的bolt元件設定到topology,並且指定它接收upperbolt元件的訊息 builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //用builder來建立一個topology StormTopology to = builder.createTopology(); //配置一些topology在叢集中執行時的引數 Config conf = new Config(); //這裡設定的是整個demotop所佔用的槽位數,也就是worker的數量 conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); //將這個topology提交給storm叢集執行 StormSubmitter.submitTopology("demotopo", conf, to); } } ################################################## ./storm jar /home/admin/storm.jar com.storm.demo1.TopologyMain; ./storm list storm kill demotopo