流式大資料計算實踐(6)----Storm簡介&使用&安裝
一、前言
1、這一文開始進入Storm流式計算框架的學習
二、Storm簡介
1、Storm與Hadoop的區別就是,Hadoop是一個離線執行的作業,執行完畢就結束了,而Storm是可以源源不斷的接受資料來源,不停的對資料進行處理,而資料就行水流一樣不停的流進來,經過處理,再將結果存入資料庫或者做其他用途
2、基礎概念
(1)Tuple(元組):資料流傳遞的基本單元,相當於資料的流動通過Tuple作為物件來傳遞
(2)Spout(龍捲):相當於資料來源,通過重寫nextTuple()方法,源源不斷的將資料流入我們的處理框架
(3)Bolt(閃電):處理資料的節點,通過重寫execute()方法,接收Spout送出的資料,並進行任意的業務處理,處理完畢還可以將資料繼續流入下一個Bolt,組成一條鏈
(4)Topology(拓撲):連線以上各個元件,使其組成一個拓撲結構,比如將多個Bolt組成一條資料鏈
3、舉例說明:比如我們現在要統計一下《戰爭與和平》這本書的每個英文單詞出現的數量
(1)編寫Spout程式碼,將書的內容源源不斷的通過句子輸入到我們的體系中
(2)編寫多個Bolt來處理資料,比如第一個Bolt專門來將句子切分成單詞,第二個Bolt專門來統計每個單詞出現的數量,每個Bolt之間通過定義Bolt來流動資料,比如統計的Bolt,定義一個二元元組(單詞,數量),第一個值就是具體的單詞,第二個值就是這個單詞出現的數量
(3)通過Topology將以上元件連線成一個完整的系統
三、Storm安裝
1、下載Storm的tar.gz包,並解壓
tar zxvf /work/soft/installer/apache-storm-1.2.2
2、修改配置檔案
(1)storm.ymal,分別配置我們的Zookeeper叢集(前文中已經搭建好了)的各個節點和nimbus節點的高可用性,避免單點故障,我們的環境有兩個機器,所以都寫上
vim /work/soft/apache-storm-1.2.2/conf/storm.yaml storm.zookeeper.servers: - "storm1" - "storm2" nimbus.seeds: ["storm1", "storm2"]
3、輸入python檢查一下機器是否安裝了python,如果沒有則安裝python,安裝完畢再執行python,發現可以進入,然後ctrl+D退出即可
apt-get install python-minimal
4、啟動Storm叢集,通過以下命令分別啟動nimbus、supervisor和控制檯UI,nohup可以當SSH客戶端關閉時,不會將程序殺死,字尾加一個&,可以理解為讓程序在後臺執行
nohup /work/soft/apache-storm-1.2.2/bin/storm nimbus & nohup /work/soft/apache-storm-1.2.2/bin/storm supervisor & nohup /work/soft/apache-storm-1.2.2/bin/storm ui &
4、通過jps命令檢視程序是否正常啟動,如果看到config_value,說明還沒啟動完畢,稍等一下就好了
5、開啟8080埠,可以看到控制檯,正常執行
四、Storm程式碼編寫
我們根據以上的思路寫一個簡單的單詞統計任務,我們先放在開發環境上面跑程式碼,是不需要Storm叢集環境的,等我們寫好程式碼並在本地跑通後,就可以搭建Storm叢集,在叢集上面跑了,關於單詞統計的程式碼網上很容易找到,下面闡述一下實現的思路,可以對照著以下文字來看程式碼,更好理解
1、建立一個maven工程,引入以下依賴,由於我這裡的思路是:通過Rabbitmq獲取訊息資料,Storm進行資料流處理,將結果儲存為Json格式並存入HBase。所以我需要引入如下依賴
<!-- HBase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.1</version> </dependency> <!-- Storm --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.4</version> <scope>provided</scope> </dependency> <!-- Json --> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20140107</version> </dependency> <!-- RabbitMQ --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> </dependency>
2、因為要使用HBase,所以參照上文的操作,還要將Hadoop的配置檔案拷貝到專案中。環境搭建好後,開始編寫程式碼
3、首先編寫Spout,也就是資料的來源,建立一個類實現IRichSpout介面,並重寫nextTuple()方法,在這個方法裡實現資料的生產,比如讀取資料庫(RDS或NoSQL),從訊息佇列獲取資料(Kafka、RabbitMQ),將輸出的資料定義成Tuple(元組),通過重寫declareOutputFields()方法,定義元組的key和數量,然後在nextTuple()方法中將元組的內容通過emit()方法傳遞到下一個元件
4、編寫Bolt,也就是資料的處理者,建立一個類實現IRichBolt介面,並重寫execute(Tuple tuple)方法,這個方法就是處理資料的邏輯了,在這裡可以編寫各種程式碼對接收到的Tuple進行處理,處理完畢後,和Spout一樣,可以將資料通過定義Tuple的方式傳遞到下一個元件,每個Bolt會對資料進行特定的處理,然後傳遞給下一個Bolt,這樣就可以組成一條資料流的處理鏈
5、編寫Topology,拓撲將上面編寫的元件連線起來,組成一個拓撲圖,資料就在這個拓撲圖裡面持續的“流動”,永不停歇,拓撲也是程式的入口,所以建立一個主函式,在主函式裡面建立一個TopologyBuilder物件,通過setSpout()、setBolt()方法將上面的元件連線起來,連線的方式涉及到Storm的八種Grouping策略
(1)Shuffle Grouping(隨機分組):最常用的分組方式,將Tuple平均隨機分配到各個Bolt裡面
(2)Fields Grouping(欄位分組):根據指定欄位進行分組,比如我們按照word欄位進行分組,相同word值的Tuple會被分配到同一個Bolt裡面
(3)All Grouping(廣播分組):所有的Bolt都可以收到Tuple
(4)None Grouping(無分組):將Tuple隨機分配到各個Bolt裡面
(5)Global Grouping(全域性分組):將Tuple分配到task id值最低的task裡面
(6)Direct Grouping(直接分組):生產者Bolt決定消費者Bolt可以接受的Tuple
(7)Local or Shuffle Grouping(本地或者隨機分組):Bolt在同一程序或存在多個task,元組會隨機分配這些task
(8)Custom Grouping (自定義分組):通過實現CustomStreamGrouping介面來定義自定義分組
6、通過TopologyBuilder連線好各個元件後,就可以提交任務了,提交任務分兩種方式:本地提交和叢集提交
(1)本地提交:提交到開發環境中,不需要安裝Storm環境,只需要引入Storm的依賴包即可,使用LocalCluster類的submitTopology方法提交任務
(2)叢集提交:提交到Storm叢集中,使用StormSubmitter類的submitTopology方法提交任務
五、提交jar包到叢集
1、首先我們要修改一下pom檔案,將之前引入的storm-core依賴裡面加<scope>provided</scope>,目的是storm-core這個依賴排除掉,因為這個依賴只是本地測試除錯依賴的,叢集中不需要這個依賴,如果不加會報錯,還要記得修改拓撲的程式碼,使用StormSubmitter類來提交
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.4</version> <scope>provided</scope> </dependency>
2、通過編譯器將我們的maven專案打包成jar
mvn clean install
3、將jar包拷貝到叢集的叢集裡面,因為我們的程式碼使用到了HBase,所以要記得把專案中的配置資料夾也拷貝過來(core-site.xml、hbase-site.xml、hdfs-site.xml),jar是掃描不到jar包裡的配置檔案的,把配置檔案放到與jar包同級目錄下即可
4、執行命令將jar包提交到叢集中執行,命令後面要記得指定主函式的全包名
nohup /work/soft/apache-storm-1.2.2/bin/storm jar /work/jar/mytest.jar com.orange.heatmap.Main &
5、進入8080控制檯,可以看到我們剛才提交的拓撲,點選進去可以檢視執行的狀態