1. 程式人生 > >hadoop(十三)storm流式計算(實時處理)

hadoop(十三)storm流式計算(實時處理)

storm介紹

說明+安裝文件

            Storm是一個開源的分散式實時計算系統,可以簡單、可靠的處理大量的資料流。被稱作“實時的hadoop”。Storm有很多使用場景:如實時分析,線上機器學習,持續計算, 分散式RPCETL等等。Storm支援水平擴充套件,具有高容錯性,保證每個訊息都會得到處理,而且處理速度很快(在一個小叢集中,每個結點每秒可以處理 數以百萬計的訊息)。Storm的部署和運維都很便捷,而且更為重要的是可以使用任意程式語言來開發應用。

高可靠性

高容錯性

Storm叢集和Hadoop叢集表面上看很類似。Hadoop上執行的是MapReduce jobs,而在Storm上執行的是拓撲(topology);

Hadoop擅長於分散式離線批處理,而Storm設計為支援分散式實時計算;

Hadoop新的spark元件提供了在hadoop平臺上執行storm的可能性

 

 

在深入理解Storm之前,需要了解一些概念:

Topologies : 拓撲,也俗稱一個任務

Spouts : 拓撲的訊息源

Bolts : 拓撲的處理邏輯單元

tuple:訊息元組

Streams : 流

Stream groupings :流的分組策略

Tasks : 任務處理單元

Executor :工作執行緒

Workers :工作程序

Configuration topology的配置

 

Topology Mapreduce

一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會執行(除非你手動kill掉)

 

Nimbus 與 ResourManager

Storm的叢集裡面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面執行一個叫

Nimbus後臺程式,它的作用類似Hadoop裡面的JobTrackerNimbus負責在叢集裡面分發程式碼,分配計算任務給機器, 並且監控狀態。

 

Supervisor (worker程序)與NodeManager(YarnChild)

每一個工作節點上面執行一個叫做Supervisor的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作程序。每一個工作程序執行一個topology的一個子集;一個執行的topology由執行在很多機器上的很多工作程序組成。 

 

 

storm安裝

1、安裝一個zookeeper叢集

2、上傳storm的安裝包,解壓

3、修改配置檔案storm.yaml

#所使用的zookeeper叢集主機
storm.zookeeper.servers:
     - "weekend05"
     - "weekend06"
     - "weekend07"

#nimbus所在的主機名
nimbus.host: "weekend05"

supervisor.slots.ports
-6701
-6702
-6703
-6704
-6705


啟動storm
在nimbus主機上
nohup ./storm nimbus 1>/dev/null 2>&1 &
nohup ./storm ui 1>/dev/null 2>&1 &

在supervisor主機上
nohup ./storm supervisor 1>/dev/null 2>&1 &


storm的深入學習:
            分散式共享鎖的實現
            事務topology的實現機制及開發模式
            在具體場景中的跟其他框架的整合(flume/activeMQ/kafka(分散式的訊息佇列系統)       /redis/hbase/mysql cluster)

 

 

demo

完成

 

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

	private SpoutOutputCollector collector;
	
	//模擬一些資料
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
	
	//不斷地往下一個元件傳送tuple訊息
	//這裡面是該spout元件的核心邏輯
	@Override
	public void nextTuple() {

		//可以從kafka訊息佇列中拿到資料,簡便起見,我們從words陣列中隨機挑選一個商品名傳送出去
		Random random = new Random();
		int index = random.nextInt(words.length);
		
		//通過隨機數拿到一個商品名
		String godName = words[index];
		
		
		//將商品名封裝成tuple,傳送訊息給下一個元件
		collector.emit(new Values(godName));
		
		//每傳送一個訊息,休眠500ms
		Utils.sleep(500);
		
		
	}

	//初始化方法,在spout元件例項化時呼叫一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;
		
		
	}

	//宣告本spout元件傳送出去的tuple中的資料的欄位名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("orignname"));
		
	}

}
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class UpperBolt extends BaseBasicBolt{

	
	//業務處理邏輯
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		
		//先獲取到上一個元件傳遞過來的資料,資料在tuple裡面
		String godName = tuple.getString(0);
		
		//將商品名轉換成大寫
		String godName_upper = godName.toUpperCase();
		
		//將轉換完成的商品名傳送出去
		collector.emit(new Values(godName_upper));
		
	}

	
	
	//宣告該bolt元件要發出去的tuple的欄位
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
		declarer.declare(new Fields("uppername"));
	}

}
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class SuffixBolt extends BaseBasicBolt{
	
	FileWriter fileWriter = null;
	
	
	//在bolt元件執行過程中只會被呼叫一次
	@Override
	public void prepare(Map stormConf, TopologyContext context) {

		try {
			fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
	}
	
	
	
	//該bolt元件的核心處理邏輯
	//每收到一個tuple訊息,就會被呼叫一次
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {

		//先拿到上一個元件傳送過來的商品名稱
		String upper_name = tuple.getString(0);
		String suffix_name = upper_name + "_itisok";
		
		
		//為上一個元件傳送過來的商品名稱新增字尾
		
		try {
			fileWriter.write(suffix_name);
			fileWriter.write("\n");
			fileWriter.flush();
			
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		
		
		
	}

	
	
	
	//本bolt已經不需要傳送tuple訊息到下一個元件,所以不需要再宣告tuple的欄位
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {

		
	}

}
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

/**
 * 組織各個處理元件形成一個完整的處理流程,就是所謂的topology(類似於mapreduce程式中的job)
 * 並且將該topology提交給storm叢集去執行,topology提交到集群后就將永無休止地執行,除非人為或者異常退出
 * @author [email protected]
 *
 */
public class TopoMain {

	
	public static void main(String[] args) throws Exception {
		
		TopologyBuilder builder = new TopologyBuilder();
		
		//將我們的spout元件設定到topology中去 
		//parallelism_hint :4  表示用4個excutor來執行這個元件
		//setNumTasks(8) 設定的是該元件執行時的併發task數量,也就意味著1個excutor會執行2個task
		builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
		
		//將大寫轉換bolt元件設定到topology,並且指定它接收randomspout元件的訊息
		//.shuffleGrouping("randomspout")包含兩層含義:
		//1、upperbolt元件接收的tuple訊息一定來自於randomspout元件
		//2、randomspout元件和upperbolt元件的大量併發task例項之間收發訊息時採用的分組策略是隨機分組shuffleGrouping
		builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
		
		//將新增字尾的bolt元件設定到topology,並且指定它接收upperbolt元件的訊息
		builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
		
		//用builder來建立一個topology
		StormTopology demotop = builder.createTopology();
		
		
		//配置一些topology在叢集中執行時的引數
		Config conf = new Config();
		//這裡設定的是整個demotop所佔用的槽位數,也就是worker的數量
		conf.setNumWorkers(4);
		conf.setDebug(true);
		conf.setNumAckers(0);
		
		
		//將這個topology提交給storm叢集執行
		StormSubmitter.submitTopology("demotopo", conf, demotop);
		
	}
}