1. 程式人生 > >基於Storm構建分散式實時處理應用初探

基於Storm構建分散式實時處理應用初探

Storm對比Hadoop,前者更擅長的是實時流式資料處理,後者更擅長的是基於HDFS,通過MapReduce方式的離線資料分析計算。對於Hadoop,本身不擅長實時的資料分析處理。兩者的共同點都是分散式架構,而且都類似有主/從關係的概念。

本文不會具體闡述Storm叢集和Zookeeper叢集如何部署的問題,這裡想通過一個實際的案例切入,分析一下如何利用Storm完成實時分析處理資料。

Storm本身是Apache託管的開源的分散式實時計算系統,它的前身是Twitter Storm。在Storm問世以前,處理海量的實時資料資訊,大部分是類似於使用訊息佇列,加上工作程序/執行緒的方式。這使得構建這類的應用程式,變得異常的複雜。很多的業務邏輯中,你不得不考慮訊息的傳送和接收,執行緒之間的併發控制等等問題。而其中的業務邏輯可能只是佔據整個應用的一小部分,而且很難做到業務邏輯的解耦。但是Storm的出現改變了這種局面,它首先抽象出資料流Stream的抽象概念,一個Stream指的是tuples組成的無邊界的序列。後面又繼續提出Spouts、Bolts的概念。Spouts在Storm裡面是資料來源,專門負責生成流。而Bolts則是以流作為輸入,並重新生成流作為輸出,並且Bolts還會繼續指定它輸入的流應該如何劃分。最後Storm是通過拓撲(Topology)這種抽象概念,組織起若干個Spouts、Bolts構成的分散式資料處理網路。Storm設計的時候,就有意的把Spouts、Bolts組成的拓撲(Topology)網路通過Thrift服務方式進行封裝,這個做法,使得Storm的Spouts、Bolts元件可以通過目前主流的任意語言實現,使得整個框架的相容性和擴充套件性更加優秀。

在Storm裡面拓撲(Topology)的概念,非常類似Hadoop裡面MapReduce的Job的概念。不同的是Storm的拓撲(Topology)只要你啟動了,它就會一直執行下去,除非你kill掉;而MapReduce的Job最終它是會結束的。基於這樣的模式,使得Storm非常適合處理實時性的資料分析、持續計算、DRPC(分散式RPC)等。

下面就結合實際的案例,設計分析一下,如何利用Storm改善應用的處理效能。

某通訊公司的垃圾簡訊監控平臺,實時地上傳每個省的疑似垃圾簡訊使用者的垃圾簡訊內容檔案,每個省則根據檔案中垃圾簡訊的內容,解析過濾出,包含指定敏感關鍵字的垃圾簡訊進行入庫。被入庫的垃圾簡訊使用者被列為敏感使用者,是重點監控物件,畢竟亂髮這些垃圾簡訊是非常不對的。垃圾簡訊監控平臺生成的檔案速度非常驚人,原來的傳統做法是,根據每個省的每一個地市,對應一個獨立應用,序列化地解析、過濾敏感關鍵字,來進行入庫處理。但是,從現狀來看,程式處理的效能並不高效,常常造成檔案積壓,沒有及時處理入庫。

現在,我們就通過Storm來重新梳理、組織一下上述的應用場景。

首先,我先說明一下,該案例中Storm叢集和Zookeeper叢集的部署情況,如下圖所示:


 

Nimbus對應的主機是192.168.95.134是Storm主節點,其餘兩臺從節點Supervisor對應的主機分別是192.168.95.135(主機名:slave1)、192.168.95.136(主機名:slave2)。同樣的,Zookeeper叢集也是部署在上述節點上。

Storm叢集和Zookeeper叢集會互相通訊,因為Storm就是基於Zookeeper的。然後先啟動每個節點的Zookeeper服務,其次分別啟動Storm的Nimbus、Supervisor服務。具體可以到Storm安裝的bin目錄下面啟動服務,啟動命令分別為storm nimbus > /dev/null 2 > &1 &和storm supervisor > /dev/null 2 > &1 &。然後用jps觀察啟動的效果。沒有問題的話,在Nimbus服務對應的主機上啟動Storm UI監控對應的服務,在Storm安裝目錄的bin目錄輸入命令:storm ui >/dev/null 2>&1 &。然後開啟瀏覽器輸入:http://{Nimbus服務對應的主機ip}:8080,這裡就是輸入:http://192.168.95.134:8080/。觀察Storm叢集的部署情況,如下圖所示:


 

可以發現,我們的Storm的版本是0.9.5,它的從節點(Supervisor)有2個,分別是slave1、slave2。一共的woker的數量是8個(Total slots)。Storm叢集我們已經部署完畢,也啟動成功了。現在就利用Storm的方式,重新改寫一下這種敏感資訊實時監控過濾的應用。先看下Storm方式的拓撲結構圖:


 

其中的SensitiveFileReader-591、SensitiveFileReader-592(使用者簡訊採集器,分地市)代表的是Storm中的Spouts元件,表示一個數據的源頭,這裡是表示從伺服器的指定目錄下,讀取疑似垃圾簡訊使用者的垃圾簡訊內容檔案。當然Spouts的元件你可以根據實際的需求,擴展出許多Spouts。

然後讀取出檔案中每一行的內容之後,就是分析檔案的內容元件了,這裡是指:SensitiveFileAnalyzer(監控簡訊內容拆解分析),它負責分析出檔案的格式內容。

為了簡單演示起見,我這裡定義檔案的格式為如下內容(隨便寫一個例子):home_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1。每個列之間用&進行連線。其中home_city=591表示疑似垃圾簡訊的使用者歸屬地市編碼,591表示福州、592表示廈門;user_id=5911000表示疑似垃圾簡訊的使用者標識;msisdn=10000表示疑似垃圾簡訊的使用者手機號碼;sms_content=abc-slave1代表的就是垃圾簡訊的內容了。SensitiveFileAnalyzer代表的就是Storm中的Bolt元件,用來處理Spouts“流”出的資料。

最後,就是我們根據解析好的資料,匹配業務規定的敏感關鍵字,進行過濾入庫了。這裡我們是把過濾好的資料存入MySQL資料庫中。負責這項任務的元件是:SensitiveBatchBolt(敏感資訊採集處理),當然它也是Storm中的Bolt元件。好了,以上就是完整的Storm拓撲(Topology)結構了。

現在,我們對於整個敏感資訊採集過濾監控的拓撲結構,有了一個整體的瞭解之後,我們再來看下如何具體編碼實現!先來看下整個工程的程式碼層次結構,它如下圖所示:


 

首先來看下,我們定義的敏感使用者的資料結構RubbishUsers,假設我們要過濾的敏感使用者的簡訊內容中,要包含“racketeer”、“Bad”等敏感關鍵字。具體程式碼如下:


 

現在,我們看下敏感資訊資料來源元件SensitiveFileReader的具體實現,它負責從伺服器的指定目錄下面,讀取疑似垃圾簡訊使用者的垃圾簡訊內容檔案,然後把每一行的資料,傳送給下一個處理的Bolt(SensitiveFileAnalyzer),每個檔案全部發送結束之後,在當前目錄中,把原檔案重新命名成字尾bak的檔案(當然,你可以重新建立一個備份目錄,專門用來儲存這種處理結束的檔案),SensitiveFileReader的具體實現如下:


 

監控簡訊內容拆解分析器SensitiveFileAnalyzer,這個Bolt元件,接收到資料來源SensitiveFileReader的資料之後,就按照上面定義的格式,對檔案中每一行的內容進行解析,然後把解析完畢的內容,繼續傳送給下一個Bolt元件:SensitiveBatchBolt(敏感資訊採集處理)。現在,我們來看下SensitiveFileAnalyzer這個Bolt元件的實現:

 


 

最後一個Bolt元件SensitiveBatchBolt(敏感資訊採集處理)根據上游Bolt元件SensitiveFileAnalyzer傳送過來的資料,然後跟業務規定的敏感關鍵字進行匹配,如果匹配成功,說明這個使用者,就是我們要重點監控的使用者,我們把它通過hibernate採集到MySQL資料庫,統一管理。最後要說明的是,SensitiveBatchBolt元件還實現了一個監控的功能,就是定期打印出,我們已經採集到的敏感資訊使用者資料。現在給出SensitiveBatchBolt的實現:


 

 

由於是通過hibernate入庫到MySQL,所以給出hibernate配置,首先是:hibernate.cfg.xml

 


 

對應的ORM對映配置檔案rubbish-users.hbm.xml內容如下:

 


 

最後,還是通過Spring把hibernate整合起來,資料庫連線池用的是:DBCP。對應的Spring配置檔案jdbc-hibernate-bean.xml的內容如下:

 


 

到此為止,我們已經完成了敏感資訊實時監控的所有的Storm元件的開發。現在,我們來完成Storm的拓撲(Topology),由於拓撲(Topology)又分為本地拓撲和分散式拓撲,因此封裝了一個工具類StormRunner(拓撲執行器),對應的程式碼如下:

 


 

好了,現在我們把上面所有的Spouts/Bolts拼接成“拓撲”(Topology)結構,我們這裡用的是分散式拓撲,來進行部署執行。具體的SensitiveTopology(敏感使用者監控Storm拓撲)程式碼如下:


 

到此為止,所有的Storm元件已經開發完畢!現在,我們把上述工程打成jar包,放到Storm叢集中執行,具體可以到Nimbus對應的Storm安裝目錄下面的bin目錄,輸入:storm jar + {jar路徑}。

比如我這裡是輸入:storm jar /home/tj/install/SensitiveTopology.jar newlandframework.storm.topology.SensitiveTopology,然後,把疑似垃圾簡訊使用者的垃圾簡訊內容檔案放到指定的伺服器下面的目錄(/home/tj/data/591、/home/tj/data/592),最後開啟剛才的Storm UI,觀察任務的啟動執行情況,這裡如下圖所示:


 

可以看到我們剛才提交的拓撲:SensitiveTopology已經成功提交到Storm叢集裡面了。這個時候,你可以滑鼠點選SensitiveTopology,然後會開啟如下的一個Spouts/Bolts的監控介面,如下圖所示:


 

我們可以很清楚地看到:Spouts元件(使用者簡訊採集器):SensitiveFileReader591、SensitiveFileReader592的執行緒數executors、任務提交emitted情況。以及Bolts元件:監控簡訊內容拆解分析器(SensitiveFileAnalyzer)、敏感資訊採集處理(SensitiveBatchBolt)的執行情況,這樣監控起來就非常方便。

此外,我們還可以到對應的Supervisor伺服器對應的Storm安裝目錄下面的logs目錄,檢視一下worker的工作日誌,我們來看下敏感資訊監控過濾的處理情況,截圖如下:

 


 

通過SensitiveBatchBolt模組的監控執行緒,可以看到,我們目前已經採集到了9個敏感資訊使用者了,再來看下,這些包含敏感關鍵字的使用者有沒有入庫MySQL成功?


 

發現入庫的結果也是9個,和日誌列印的數量上是一致的。而且垃圾簡訊內容sms_content果然都包含了“racketeer”、“Bad”這些敏感關鍵字!完全符合我們的預期。而且,以後檔案處理量上來了,我們可以通過調整設定Spouts/Bolts的並行度,和Worker的數量進行化解。當然,你還可以通過水平擴充套件叢集的數量來解決這個問題。

目前在國內,就我個人看法,對Storm分析應用,做得最好的應該算是阿里巴巴,它在原來Storm的基礎上加以改良,開源出JStorm,有興趣的朋友,可以多關注一下。

藉助Storm,我們可以很輕鬆地開發分散式實時處理應用,而上述場景的設計,只是Storm應用的一個案例。相比傳統的單機伺服器應用而言,叢集化地並行協同計算處理,是雲端計算、大資料時代的一個趨勢,也是我今後努力學習的方向。

結語

感謝您的觀看,如有不足之處,歡迎批評指正。

如果有對大資料感興趣的小夥伴或者是從事大資料的老司機可以加群:

658558542    

歡迎大家進群交流討論,學習交流,共同進步。(裡面還有大量的免費資料,幫助大家在成為大資料工程師,乃至架構師的路上披荊斬棘!)

最後祝福所有遇到瓶疾且不知道怎麼辦的大資料程式設計師們,祝福大家在往後的工作與面試中一切順利。