1. 程式人生 > >大資料入門(22)storm的第一個例項

大資料入門(22)storm的第一個例項

public class RandomWordSpout extends BaseRichSpout{
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	
	//模擬一些資料
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
		
	@Override
	public void nextTuple() {
		//words陣列中隨機挑選一個商品名傳送出去
		Random r = new Random();
		int i = r.nextInt(words.length);
		
		//通過隨機數拿到一個商品名
		String value = words[i];
		
		//將商品名封裝成tuple,傳送訊息給下一個元件
		this.collector.emit(new Values(value));
		
		//每傳送一個訊息,休眠500ms
		Utils.sleep(500);
	}

	//初始化方法,在spout元件例項化時呼叫一次
	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
		// TODO Auto-generated method stub
		this.collector = collector;
	}

	//宣告本spout元件傳送出去的tuple中的資料的欄位名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("orignname"));
	}
}

##########################################################################
public class UpperBolt extends BaseBasicBolt{
	private static final long serialVersionUID = 1L;
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		// TODO Auto-generated method stub
		String value = tuple.getString(0);
		String value_upper = value.toUpperCase();
		collector.emit(new Values(value_upper));
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("upperName"));
	}
}
##############################################################################
public class SuffixBolt extends BaseBasicBolt{

	private static final long serialVersionUID = 1L;
	FileWriter fileWriter = null;
	
	//在bolt元件執行過程中只會被呼叫一次
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub
		try {
			fileWriter = new FileWriter("/home/admin/storm_output/"+UUID.randomUUID());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	//該bolt元件的核心處理邏輯
	//每收到一個tuple訊息,就會被呼叫一次
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		// TODO Auto-generated method stub
		//先拿到上一個元件傳送過來的商品名稱
		String upper_name =tuple.getString(0);
		//為上一個元件傳送過來的商品名稱新增字尾
		String suffix_name = upper_name + "2018-11-06";		
		try {
			fileWriter.write(suffix_name);
			fileWriter.write("\n");
			fileWriter.flush();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}					 
	}

	//本bolt已經不需要傳送tuple訊息到下一個元件,所以不需要再宣告tuple的欄位
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub		
	}
}
########################################################################
public class TopologyMain {
	public static void main(String[] args) throws Exception{
		TopologyBuilder builder = new TopologyBuilder();
		
		
		//將我們的spout元件設定到topology中去 
		//parallelism_hint :4  表示用4個excutor來執行這個元件
		//setNumTasks(8) 設定的是該元件執行時的併發task數量,也就意味著1個excutor會執行2個task
		builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
		
		//將大寫轉換bolt元件設定到topology,並且指定它接收randomspout元件的訊息
		//.shuffleGrouping("randomspout")包含兩層含義:
		//1、upperbolt元件接收的tuple訊息一定來自於randomspout元件
		//2、randomspout元件和upperbolt元件的大量併發task例項之間收發訊息時採用的分組策略是隨機分組shuffleGrouping
		builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
		
		
		//將新增字尾的bolt元件設定到topology,並且指定它接收upperbolt元件的訊息
		builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
		
		//用builder來建立一個topology
		StormTopology  to = builder.createTopology();
		
		//配置一些topology在叢集中執行時的引數
		Config conf = new Config();
		//這裡設定的是整個demotop所佔用的槽位數,也就是worker的數量
		conf.setNumWorkers(4);
		conf.setDebug(true);
		conf.setNumAckers(0);
		
		//將這個topology提交給storm叢集執行
		StormSubmitter.submitTopology("demotopo", conf, to);
	}
}

##################################################
./storm jar /home/admin/storm.jar com.storm.demo1.TopologyMain;

./storm list

storm kill demotopo