Storm和Zookeeper叢集搭建及在java專案中的使用
上一篇:
介紹了分散式Zookeeper叢集的搭建和Kafka叢集的搭建,接下來學習一下Storm叢集的搭建。
實驗環境
- Cent OS 6.9
- apache-storm-1.1.0
- 三臺伺服器虛擬機器:192.168.1.129、192.168.1.214、192.168.1.241
準備工作
1、修改伺服器hostname和hosts檔案使得可以直接通過hostname訪問
使用hostname name
修改hostname,例如:
#hostname xuliugen129
(或者直接修改/etc/hostname
檔案,輸入要設定的hostname)
將三臺伺服器分別修改為:xuliugen129、xuliugen214、xuliugen241。
2、然後,修改/etc/hosts
使之在內網環境下可以ping
通:
每臺機子都設定為上邊的hostname,只需要新增上邊的三行即可。
3、關閉3臺伺服器的防火牆:
service firewalld stop
Zookeeper安裝與配置
在上一篇中介紹了Zookeeper叢集的安裝方式,這裡不再介紹,請檢視:Kafka 單機和分散式環境搭建與案例使用
Storm安裝與配置
1、選擇合適的版本進行下載Storm
使用wget進行下載:
>wget http://mirrors.tuna .tsinghua.edu.cn/apache/storm/apache-storm-1.1.0/apache-storm-1.1.0.tar.gz
分別下載到3臺伺服器上,然後解壓到自己合適的位置,我這裡都是:/home/xuliugen/server
2、配置Storm
配置檔案在:/home/xuliugen/server/apache-storm-1.1.0/conf
目錄下的storm.yaml
預設只需要修改如下部分:
3臺伺服器的Storm進行上述相同的配置即可。
Storm啟動與應用
1、Storm命令
啟動命令在:/home/xuliugen/server/apache-storm-1.1.0/bin
以上列出了所有可以執行的命令模式。
2、啟動nimbus
nimbus主機為:xuliugen129
另外開啟一個控制介面,使用jps檢視是否正常啟動:
nimbus表示正常啟動。
3、啟動Storm UI
Storm UI這可以在啟動nimbus的機器上執行,這裡是:xuliugen129,啟動如下:
啟動之後,jps如下:
Storm UI 程序為core
,訪問xuliugen129:8080(這裡我的PC級訪問的話,就不再是上述設定的hostname,應該是對應的內網IP,預設埠為8080)
3、啟動另外兩臺supervisor
主機分別為:xuliugen214、xuliugen241
>./storm supervisor
檢視Storm UI如下:
4、最後檢視任一臺Zookeeper伺服器,檢視節點情況:
專案程式碼使用
1、專案結構
核心jar為:storm-core-1.1.0.jar
2、執行原理:
一個Topology是Spouts和Bolts組成的圖, 通過Stream Groupings將圖中的Spouts和Bolts連線起來,如下圖:
3、Spout程式碼
public class RandomNameSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
// 模擬一些資料
String[] names = { "zhangsan", "lisi", "wangwu", "zhaoliu", "sunqi", "wangba" };
/**
* 不斷地往下一個元件傳送tuple訊息 這裡面是該spout元件的核心邏輯
*/
@Override
public void nextTuple() {
Random random = new Random();
int index = random.nextInt(names.length);
// 通過隨機數拿到一個姓名
String lowerName = names[index];
// 將姓名封裝成tuple,傳送訊息給下一個元件
collector.emit(new Values(lowerName));
// 每傳送一個訊息,休眠500ms
Utils.sleep(500);
}
/**
* 初始化方法,在spout元件例項化時呼叫一次c
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 宣告本spout元件傳送出去的tuple中的資料的欄位名
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("lowerName"));
}
}
3、UpperBolt程式碼
public class UpperBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void execute(Tuple tuple) {
// 先獲取到上一個元件傳遞過來的資料,資料在tuple裡面
String lowerName = tuple.getString(0);
// 將姓名轉換成大寫
String upperName = lowerName.toUpperCase();
// 將轉換完成的商品名傳送出去
collector.emit(new Values(upperName));
}
@Override
public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
}
/**
* 宣告該bolt元件要發出去的tuple的欄位
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("upperName"));
}
}
4、AppendBolt程式碼
public class AppendBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
FileWriter fileWriter = null;
@Override
public void execute(Tuple tuple) {
// 先拿到上一個元件傳送過來的姓名
String upperName = tuple.getString(0);
String suffix_name = upperName + "_csdn";
try {
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 在bolt元件執行過程中只會被呼叫一次
*/
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
try {
fileWriter = new FileWriter("/home/xuliugen/server/stormdata/" + UUID.randomUUID());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 本bolt已經不需要傳送tuple訊息到下一個元件,所以不需要再宣告tuple的欄位
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
5、TopologyMain程式碼
public class TopologyMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// 將我們的spout元件設定到topology中去
// parallelism_hint :4 表示用4個excutor來執行這個元件
// setNumTasks(8) 設定的是該元件執行時的併發task數量,也就意味著1個excutor會執行2個task
builder.setSpout("randomspout", new RandomNameSpout(), 4).setNumTasks(8);
// 將大寫轉換bolt元件設定到topology,並且指定它接收randomspout元件的訊息
// .shuffleGrouping("randomspout")包含兩層含義:
// 1、upperbolt元件接收的tuple訊息一定來自於randomspout元件
// 2、randomspout元件和upperbolt元件的大量併發
// task例項之間收發訊息時採用的分組策略是隨機分組shuffleGrouping
builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
// 將新增字尾的bolt元件設定到topology,並且指定它接收upperbolt元件的訊息
builder.setBolt("suffixbolt", new AppendBolt(), 4).shuffleGrouping("upperbolt");
// 用builder來建立一個topology
StormTopology stormTopologyDemo = builder.createTopology();
// 配置一些topology在叢集中執行時的引數
Config conf = new Config();
// 這裡設定的是整個StormTopologyDemo所佔用的槽位數,也就是worker的數量
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);
// 將這個topology提交給storm叢集執行
StormSubmitter.submitTopology("StormTopologyDemo", conf, stormTopologyDemo);
}
}
6、提交到Storm叢集執行:
1、將專案匯出jar,步驟如下:
2、如果有警告的話,直接忽略即可。
3、將匯出的jar上傳到伺服器上,nimbus主機即可,我這裡上傳到:/home/xuliugen/temp
目錄下,
4、在2臺supervisor
主機上建立專案中需要的目錄:
/home/xuliugen/server/stormdata/
5、提交到Storm叢集的命令為:
>./storm jar /home/xuliugen/temp/stormdemo.jar com.xuliugen.demo.topology.TopologyMain
(1)/home/xuliugen/temp/stormdemo.jar為打包的jar在伺服器上的位置;
(2)com.xuliugen.demo.topology.TopologyMain位主函式全路徑;
6、最後檢視Storm UI可以看到:
7、執行效果