大資料之storm(一) --- storm簡介,核心元件,工作流程,安裝和部署,電話通訊案例分析,叢集執行,單詞統計案例分析,調整併發度
阿新 • • 發佈:2018-12-12
一、storm簡介 --------------------------------------------------------- 1.開源,分散式,實時計算 2.實時可靠的處理無限資料流,可以使用任何語言開發 3.適用於實時分析,線上機器學習,分散式PRC,ETL 4.每秒可以處理上百萬條記錄(元組) 5.可拓展,容錯,並可保證資料至少處理一次 6.低延遲,秒級,分鐘級計算響應 二、核心概念 ------------------------------------------------------------ 1.toplogy: a.封裝 實時計算 的物件,類似於MR,並且不會終止 b.spout和bolt 連線在一起形成一個top, 形成有向圖[定點 -- 邊] 定點代表計算核心,邊代表資料流 2.nimbus: a.資源排程和分配 ,類似於jobTracker b.master node, 是核心元件,領導,管理指派task c.分析top,收集並執行task.分發task給supervisor d.使用內部的訊息系統,與 supervisor進行通訊 e.監控top是否失敗,無狀態,必須依靠zk來監控top的執行狀態 3.supervisor: a.接收nimbus的指令, 啟動和管轄所有的worker ,類似於 TaskTracker, b.worker node , 監理 ,分配Task給worker c.每個supervisor 有n個worker程序,supervisor管理旗下所有的worker 程序 4.Worker :本身不執行任務,只是孵化出 具體處理計算 的程序[Executors],讓executors程序執行tasks 5.Executor: Worker孵化出的一個物理執行緒,內部執行的task必須屬於同一作業[spout/bolt] 6.Task: storm 中的最小工作單元,類似於MR,執行實際的任務處理工作,或者是spout或者是bolt 7.Spout: 水龍頭,獲取資料來源資料,通過nextTuple函式傳送到bolt。資料流的源頭,可以自定義,可以kafka 8.bolt: 轉接頭,邏輯處理單元,接受spout傳送的資料,進行過濾,合併,寫入db等. 9.Tuple:元組,訊息的基本單位,主要的資料結構,有序元素的列表 10.Stream : 源源不斷的tuple構成了流,一系列元組 11.stream group: 訊息組/分割槽 -- 分組方式有shuffle,fields,all,global,none,direct,localshuffle等 三、storm和hadoop的對比 ----------------------------------------------------------- storm hadoop 實時流處理 批處理 無狀態 有狀態 使用zk協同的主從架構 無zk主從架構 每秒處理百萬記錄 MR作業 數分鐘數小時 不會主動停 會停 toplogy -- task mapreduce -- mrtask nimbus - Supervisor jobTracker -- TaskTracker spout - bolt map -- reduce 四、storm的工作流程 ----------------------------------------------------------- 1.nimbus等待toplogy的提交 2.提交toplogy 3.nimbus收到top,從中提取tasks 4.nimbus分發tasks給所有可用的supervisors 5.all supervisors 會週期性的傳送心跳資訊給nimbus證明其依然存活,如果supervosor掛掉,nimbus就不再給其傳送tasks,取而代之的是傳送給其他的supervisor 6.當nimbus掛掉,已經分配給supervisors的tasks會繼續執行,而不受其影響。 7.tasks完成之後,supervisor會等待新的tasks 8.掛掉的nimbus會通過監控工具自動重啟,從掛掉的地方繼續工作 五、安裝和部署storm ------------------------------------------------------------- 1.找4臺機器,1 nimbus , 3 supervisors 2.下載安裝包apache-storm-1.1.0.tar.gz,tar開,建立符號連結,分發 3.配置環境變數,分發 [/etc/environment] ... STORM_HOME="/soft/storm" $PATH="... :/soft/storm/bin" 4.驗證安裝 $> storm version 5.部署storm a.配置配置檔案並分發 [/soft/storm/conf/storm.yaml] storm.local.dir: "/home/ubuntu/storm" storm.zookeeper.servers: - "s200" - "s300" storm.zookeeper.port: 2181 ### nimbus.* configs are for the master nimbus.seeds : ["s201"] ### ui.* configs are for the master ui.host: 0.0.0.0 ui.port: 8080 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 6.啟動storm a.在s100上啟動nimbus節點 $> storm nimbus b.在s200,s300,s400上啟動 supervisor $> storm supervisor c.在s100上啟動UI程序 $> cd /soft/storm/bin $bin> ./storm ui & d.檢視webui http://s100:8080 六、電話通訊案例分析 -------------------------------------------------------------- 1.spout類 ----------------------------------------------------------
package test.storm; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; /** * Storm Spout類 * 負責產生資料流的 */ public class CallLogSpout implements IRichSpout { //spout輸出的收集器:傳遞tuples to bolt private SpoutOutputCollector collector; //是否完成 private boolean completed = false; //top資料的封裝 private TopologyContext context; //隨機發送器 private Random randomGenerator = new Random(); //索引 private Integer idx = 0; public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.context = topologyContext; this.collector = spoutOutputCollector; } public void close() { } public void activate() { } public void deactivate() { } /** * 下一個元組,記錄 */ public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { //隨機主叫 String caller = mobileNumbers.get(randomGenerator.nextInt(4)); //隨機被叫 String callered = mobileNumbers.get(randomGenerator.nextInt(4)); while(caller == callered) { callered = mobileNumbers.get(randomGenerator.nextInt(4)); } //隨機通話時長 Integer duration = randomGenerator.nextInt(60); //收集器傳送元組資訊給bolt this.collector.emit(new Values(caller, callered, duration)); } } } public void ack(Object o) { } public void fail(Object o) { } /** * 定義輸出欄位的名稱 * @param declarer */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("caller", "callered", "time")); } public Map<String, Object> getComponentConfiguration() { return null; } }
2.BoltA類
-----------------------------------------------------------------------
package test.storm; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * 呼叫日誌的轉接頭類bolt -- 對通話記錄進行一次組裝 */ public class CallLogBolt implements IRichBolt { //收集器:用於接收spout傳送的元組資訊併發送給輸出者 private OutputCollector collector; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } /** * 執行方法 -- 計算過程 * @param tuple */ public void execute(Tuple tuple) { //從元組中取出主叫 String from = tuple.getString(0); //從tuple中取出被叫 String to = tuple.getString(1); //從元組中取出通話時長 Integer duration = tuple.getInteger(2); //收集器組裝訊息[處理通話記錄],產生新的tuple,然後傳送出去 collector.emit(new Values(from + " - " + to, duration)); } public void cleanup() { } /** * 給tuple定義輸出欄位的名稱 * @param declarer */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("calllog", "duration")); } public Map<String, Object> getComponentConfiguration() { return null; } }
3.BoltB類
-------------------------------------------------------------
package test.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* 呼叫日誌的轉接頭類bolt -- 對通話記錄進行統計
*/
public class CounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
/**
* 初始化
*/
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.counterMap = new HashMap<String, Integer>();
collector = outputCollector;
}
/**
* 統計通話次數
*/
public void execute(Tuple tuple) {
String calllog = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(calllog)){
counterMap.put(calllog, 1);
}else{
Integer c = counterMap.get(calllog) + 1;
counterMap.put(calllog, c);
}
//到達最後一個bolt,進行ack確認,表示元組被處理完畢了[表示到此處,一個元組已經被處理完畢,往後沒有bolt了]
collector.ack(tuple);
}
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
4.App類
----------------------------------------------------------------------
package test.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* app入口類 -- toplogy
*/
public class App_Toplogy {
public static void main(String [] args)
{
//storm cluster configuration
Config config = new Config();
config.setDebug(true);
//建立top
TopologyBuilder builder = new TopologyBuilder();
//指定龍頭
builder.setSpout("spoutA", new CallLogSpout());
//採用shuffleGrouping的方式,將boltA對接到spoutA
builder.setBolt("boltA", new CallLogBolt()).shuffleGrouping("spoutA");
//採用fieldsGrouping的方式,對boltA中的call這個欄位進行分組,將boltB對接到boltA
builder.setBolt("boltB", new CounterBolt()).fieldsGrouping("boltA", new Fields("calllog"));
// //本地模式
// LocalCluster cluster = new LocalCluster();
// //提交top
// cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// //Stop the topology
// cluster.shutdown();
//叢集模式
try {
StormSubmitter.submitTopology("mytop",config,builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
七、釋出到storm叢集上執行
-------------------------------------------------------------------------------
1.修改app類 -- 改為叢集提交模式
//叢集模式
try {
StormSubmitter.submitTopology("mytop",config,builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
2.匯出jar包
3.在ubuntu上執行
$bin> ./storm jar /share/storm/TestStrom-1.0-SNAPSHOT.jar test.storm.App_Toplogy
4.在s200上開啟日誌檢視器
$bin> ./storm logviewer &
5.檢視結果[電話記錄統計資訊]
$s400> cat /soft/storm/logs/workers-artifacts/mytop-1-1538085858/6700/worker.log | grep 1234
八、單詞統計案例
---------------------------------------------------------------------------
1.WordSpout -- 產生單詞
---------------------------------------------
package test.storm.wc;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import util.Util;
import java.util.Map;
import java.util.Random;
/**
* 單詞產生源spout -- 水龍頭
*/
public class WordSpout implements IRichSpout {
private TopologyContext context;
private SpoutOutputCollector collector;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
Util.sendToClient(this,"WordSpout.open()",7777);
context = topologyContext;
collector = spoutOutputCollector;
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
/**
* 下一個
*/
public void nextTuple() {
//Util.sendToClient(this,"WordSpout.nextTuple()",7777);
String line = "how are you" + " tom" + new Random().nextInt(100);
collector.emit(new Values(line));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void ack(Object o) {
}
public void fail(Object o) {
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
2.SplitBolt -- 進行切割
----------------------------------------------
package test.storm.wc;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import util.Util;
import java.util.Map;
public class SpiltBolt implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
Util.sendToClient(this,"SpiltBolt.prepare()",8888);
this.context = topologyContext;
this.collector = outputCollector;
}
/**
* 計算
*/
public void execute(Tuple tuple) {
//Util.sendToClient(this,"SpiltBolt.execute()" + tuple.toString(), 8888);
String str = tuple.getString(0);
String [] strs = str.split(" ");
for(String s : strs)
{
collector.emit(new Values(s));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
3.CounterBolt -- 進行單詞統計
----------------------------------------------
package test.storm.wc;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import util.Util;
import java.util.HashMap;
import java.util.Map;
public class CounterBolt implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
private Map<String, Integer> map = new HashMap<String, Integer>();
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
Util.sendToClient(this,"CounterBolt.prepare()" , 9999);
context = topologyContext;
collector = outputCollector;
map = new HashMap<String, Integer>();
}
public void execute(Tuple tuple) {
//Util.sendToClient(this,"CounterBolt.execute()" + tuple.toString(),9999);
String word = tuple.getString(0);
if (map.containsKey(word)) {
int count = map.get(word) + 1;
map.put(word, count);
}
else {
map.put(word, 1);
}
collector.ack(tuple);
}
public void cleanup() {
for(Map.Entry<String, Integer> entry:map.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
4.App主程式
----------------------------------------------
package test.storm.wc;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class App {
public static void main(String [] args)
{
//storm cluster configuration
Config config = new Config();
config.setDebug(true);
//設定5個worker
config.setNumWorkers(5);
//建立top
TopologyBuilder builder = new TopologyBuilder();
//指定龍頭,設定併發暗示為3
builder.setSpout("wordSpout", new WordSpout(),3).setNumTasks(3);
//採用shuffleGrouping的方式,將boltA對接到spoutA,並設定併發暗示為4
builder.setBolt("spiltBolt", new SpiltBolt(),4).shuffleGrouping("wordSpout");
//採用fieldsGrouping的方式,對boltA中的call這個欄位進行分組,將boltB對接到boltA,設定併發暗示為5
builder.setBolt("counterBolt", new CounterBolt(),5).fieldsGrouping("spiltBolt", new Fields("word"));
//本地模式
// LocalCluster cluster = new LocalCluster();
// //提交top
// cluster.submitTopology("wc", config, builder.createTopology());
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// //Stop the topology
// cluster.shutdown();
//叢集模式
try {
StormSubmitter.submitTopology("wc",config,builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
九、設定top的併發程度-- 配置任務數,worker數,執行緒數
---------------------------------------------------------------------
1.設定worker數量
conf.setNumWorkers(3); //預設為1
2.設定執行執行緒execcutor數量
//指定龍頭,設定併發暗示為3 [每個worker 開啟三個計算執行緒來產生單詞源]
builder.setSpout("wordSpout", new WordSpout(),3);
//採用shuffleGrouping的方式,將boltA對接到spoutA,並設定併發暗示為4 [每個worker,開啟4個執行緒數來切割]
builder.setBolt("spiltBolt", new SpiltBolt(), 4).shuffleGrouping("wordSpout");
3.設定任務數
//設定3個計算執行緒executor,6個任務Task -- 1個執行緒平均2個任務
builder.setSpout("wordSpout", new WordSpout(),3).setNumTasks(6);
4.按照上面的設定,結果為
-- 以Spout為例
-- 1個nimbus -- 3臺主機執行3個worker -- 每個worker開啟3個計算執行緒,總共6個task(均分),每個計算執行緒計算2個任務
-- 一臺主機最多可以跑4個worker[配置檔案決定],並且會平均分配資源 -- 3臺主機執行3個worker
-- 主機上開啟worker程序,worker程序上開啟executor執行緒,executor執行緒上開啟若干分執行緒Task,Task為最小單元,進行需求計算
5.併發度 == 所有Tasks數量的總和