1. 程式人生 > >Storm部分:程式碼模板【Java版純程式碼】

Storm部分:程式碼模板【Java版純程式碼】

總結:構成部分:

Spout部分:繼承BaseRichSpout類,實現裡邊的三個方法:nextTuple,open,declareOutPutFields.主要的方法在nexttuple中寫,打包成集合的形式,在這個方法中用emit傳送,同時在declareOutPutFields也有傳送

Bolt方法:繼承BaseRichBolt類,實現內部的三個方法:execute,open,declareOutputDeclare。主要是在execute中寫,包括切分集合等。如果還有一個步驟的話,那還需要再發送給下一步。

Main方法:建立:TopologyBuilder tb=new TopologyBuilder();

用tb分別去調動其他的執行緒和程序,裡邊設定Grouping的方法和形式

LocalCluster lc=new LocalCluster();

呼叫。

1.Test測試程式碼

package com.bjsxt.storm.test;

import com.bjsxt.storm.bolt.WsBolt;
import com.bjsxt.storm.spout.WsSpout;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

/**
 * 
 * 構建拓撲結構 Topology ----- 》job
 * 
 * @author Administrator
 *
 */
public class Test {

	public static void main(String[] args) {
		
		TopologyBuilder tb = new TopologyBuilder();
		
		tb.setSpout("wsspout", new WsSpout());
		tb.setBolt("wsbolt", new WsBolt()).shuffleGrouping("wsspout");
		

		//建立本地服務叢集
		LocalCluster lc = new LocalCluster();
		lc.submitTopology("ws", new Config(), tb.createTopology());
		
		
	}

}

2.Spout:傳送端

package com.bjsxt.storm.spout;

import java.util.List;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * 
 * 資料累加: 1+2+3+4......
 * 
 * @author Administrator
 *
 */
public class WsSpout extends BaseRichSpout {

	Map map;
	TopologyContext context;
	SpoutOutputCollector collector;// 傳送器
	int i = 0;

	/**
	 * 傳送資料,不斷被執行緒呼叫
	 */
	@Override
	public void nextTuple() {

		i++;
		List tuple = new Values(i);
		this.collector.emit(tuple);
		System.err.println("spout -----------------------" + i);

		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {

			e.printStackTrace();
		}

	}

	@Override
	public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
		this.map = map;
		this.context = context;
		this.collector = collector;

	}

	/**
	 * 聲明發送資料的資訊
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("num"));

	}

}

3.Bolt:計算與分析

package com.bjsxt.storm.bolt;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class WsBolt extends BaseRichBolt {

	Map stormConf;
	TopologyContext context;
	OutputCollector collector;
	int sum = 0;

	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

		this.stormConf = stormConf;
		this.context = context;
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {

		// 1.接收tuple內的資料
		int i = input.getIntegerByField("num");

		// 2.累加
		sum += i;
		
		//3.輸出效果
		System.out.println("sum-------------------------------"+sum);
		

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

}