在大資料處理和人工智慧時代,資料工廠(Data Factory)無疑是一個非常重要的大資料處理平臺。市面上也有成熟的相關產品,比如Azure Data Factory,不僅功能強大,而且依託微軟的雲端計算平臺Azure,為大資料處理提供了強大的計算能力,讓大資料處理變得更為穩定高效。由於工作中我的專案也與大資料處理相關,於是我就在思考,是否自己也可以設計打造一個數據工廠,以便尋求一些技術痛點的解決方案,並且引入一些有趣的新功能。 因此,我利用業餘時間,逐步打造了一個基於Spark的資料工廠,並取名為Abacuza(Abacus是中國的“算盤”的意思,隱喻它是一個專門做資料計算的平臺,使用“算盤”一詞的變體,也算是體現一點中國元素吧)。說是基於Spark,其實從整個架構來看,Abacuza並不一定非要基於Spark,只需要為其定製某種資料處理引擎的外掛即可,所以,Spark其實僅僅是Abacuza的一個外掛,當然,Spark是目前主流的資料處理引擎,Abacuza將其作為預設的資料處理外掛。 Abacuza是開源的,專案地址是:https://github.com/daxnet/abacuza。徒手打造?是的,沒錯,從前端介面都後端開發,從程式碼到持續整合,再到部署指令碼和SDK與容器映象的釋出,都是自己一步步弄出來的。專案主頁上有一個簡單的教程,後面我會詳細介紹一下。在介紹如何使用Abacuza之前,我們先了解一下它的整體架構和設計思想。雖然目前Abacuza還有很多功能沒有完成,但並不影響整個資料處理流程的執行。
整體架構
Abacuza和其它的資料工廠平臺一樣,它的業務流程就是分三步走:資料讀入、資料處理、結果輸出。Abacuza的整體架構圖就很清楚地體現了這個業務流程:
(點選檢視大圖)
資料輸入部分
資料的輸入是由輸入端點(Input Endpoints)來定義的。Abacuza支援多種資料型別的輸入:CSV檔案、JSON檔案、TXT文字檔案、Microsoft SQL Server(暫未完全實現)以及S3的物件儲存路徑,今後還可以繼續擴充套件輸入端點,以支援基於管道(Pipeline)的資料處理流程,這樣一來,使用者就不需要自己使用C#或者Scala來編寫資料處理的邏輯程式碼,只需要一套JSON檔案進行Pipeline定義就可以了。
資料處理部分
當資料輸入已經定義好以後,Abacuza會根據Input Endpoint的設定,將資料讀入,然後轉交給後端的資料處理叢集(Cluster)進行處理。Abacuza可以以外掛的形式支援不同型別的叢集,如上文所說,Apache Spark是Abacuza所支援的一種資料處理叢集,在上面的架構圖中可以看到,Abacuza Cluster Service管理這些叢集,工作任務排程器(Job Scheduler)會通過Abacuza Cluster Service將資料處理任務分配到指定型別的叢集上進行處理。 對於Spark而言,具體的資料處理邏輯是由使用者自己編寫程式碼實現的。Spark原生支援Scala,也可以使用PySpark,Abacuza使用Microsoft .NET for Spark專案實現從.NET到Spark的繫結(Binding),使用者可以使用C#來編寫Spark的資料處理邏輯,後面的演練部分我會詳細介紹。 那麼與Scala相比,通過.NET for Spark使用C#編寫的資料處理程式會不會有效能問題?嗯,會有點效能問題,請看下圖(圖片來源:微軟.NET for Spark官方網站):
在這個Benchmark中,處理相同總量的資料,Scala使用了375秒,.NET花了406秒,Python使用433秒,雖然與Scala相比有些差距,但是比Python要好一些。但是不用擔心,如果在你的應用場景中,效能是放在第一位的,那麼Abacuza的Job Runner機制允許你使用Scala編寫資料處理程式,然後上傳到Spark叢集執行(也就是你不需要依賴於.NET和C#)。
資料輸出部分
與資料輸入部分類似,處理之後的資料輸出方式是由輸出端點(Output Endpoints)來定義的。Abacuza也支援多種資料輸出方式:將結果列印到日誌、將結果輸出到外部檔案系統以及將結果輸出到當前專案所在的S3物件儲存路徑。無論是資料輸入部分還是輸出部分,這些端點都是可以定製的,並且可以通過ASP.NET Core的外掛系統以及docker-compose或者Kubernetes的volume/Block Storage來實現動態載入。
相關概念和運作機理
Abacuza有以下這些概念:
- 叢集(Cluster):一個叢集是一個完整的大資料處理平臺,比如Apache Spark
- 叢集型別(Cluster Type):定義叢集的型別,例如,執行在localhost的Spark叢集和執行在雲端的Spark叢集都是Spark叢集,那麼它們的叢集型別就是spark。
- 叢集連線(Cluster Connection):定義了Abacuza資料工廠訪問叢集的方式,類似於資料庫系統的連線字串
- 任務執行器(Job Runner):定義了資料處理任務應該如何被提交到叢集上執行。它可以包含具體的資料處理業務邏輯
- 輸入端點(Input Endpoint):定義了原始資料(需要被處理的資料)的來源
- 輸出端點(Output Endpoint):定義了處理完成後的資料的輸出方式
- 專案(Project):一種型別資料處理任務的邏輯定義,它包括多個輸入端點、一個輸出端點以及多個數據處理版本(Revision)的資訊,同時它還定義了應該使用哪個任務執行器來執行資料處理任務
- 資料處理版本(Revision):它歸屬於一個特定的專案,表示不同批次的資料處理結果
當一個使用者準備使用Abacuza完成一次大資料處理的任務時,一般會按照下面的步驟進行:
- 使用使用者名稱/密碼(暫時只支援使用者名稱密碼登入)登入Abacuza的管理介面
- 基於一個已經安裝好的叢集(比如Apache Spark),配置它的叢集型別和叢集連線,用來定義Abacuza與該叢集的通訊方式(叢集和叢集連線定義了資料應該在哪裡被處理(where))
- 定義任務執行器,在任務執行器中,設定執行資料處理任務的叢集型別,當資料處理任務被提交時,Abacuza Cluster Service會基於所選的叢集型別,根據一定的演算法來選擇一個叢集進行資料處理。任務執行器中也定義了資料處理的邏輯,(比如,由Scala、C#或者Python編寫的應用程式,可以上傳到spark型別的叢集上執行)。簡單地說,任務執行器定義了資料應該如何被處理(how)
- 建立一個新的專案,在這個專案中,通過輸入端點來設定所需處理的資料來源,通過輸出端點來設定處理後的資料的存放地點,並設定該專案所用到的任務執行器。之後,使用者點選Submit按鈕,將資料提交到叢集上進行處理。處理完成後,在資料處理版本列表中檢視結果
技術選型
Abacuza採用微服務架構風格,每個單獨的微服務都在容器中執行,目前實驗階段採用docker-compose進行容器編排,今後會加入Kubernetes支援。現將Abacuza所使用的框架與相關技術簡單羅列一下:
- Spark執行程式選擇Microsoft .NET for Spark,一方面自己對.NET技術棧比較熟悉,另一方面,.NET for Spark有著很好的流式資料處理的SDK API,並且可以很方便地整合ML.NET實現機器學習的業務場景
- 所有的微服務都是使用執行在.NET 5下的ASP.NET Core Web API實現,每個微服務的後端資料庫採用MongoDB
- 用於任務排程的Abacuza Job Service微服務使用Quartz.NET實現定期任務排程,用來提交資料處理任務以及更新任務狀態。後端同時採用了PostgreSQL資料庫
- 儲存層與服務層之間引入Redis做資料快取,減少MongoDB的查詢負載
- 預設支援的Spark叢集使用Apache Livy為其提供RESTful API介面
- 檔案物件儲存採用MinIO S3
- API閘道器採用Ocelot框架
- 微服務的瞬態故障處理:Polly框架
- 身份認證與授權採用ASP.NET Core Identity整合的IdentityServer4解決方案
- 反向代理:nginx
- 前端頁面:Angular 12、Angular powered Bootstrap、Bootstrap、AdminLTE
弱弱補一句:本人前端技術沒有後端技術精湛,所以前端頁面會有不少問題,樣式也不是那麼的專業美觀,前端高手請忽略這些細節。;) Abacuza採用了外掛化的設計,使用者可以根據需要擴充套件下面這些元件:
- 實現自己的資料處理叢集以及叢集連線:因此你不必拘泥於使用Apache Spark
- 實現自己的輸入端點和輸出端點:因此你可以自定義資料的輸入部分和輸出部分
- 實現自己的任務執行器:因此你可以選擇不採用基於.NET for Spark的解決方案,你可以自己用Scala或者Python來編寫資料處理程式
在Abacuza的管理介面中,可以很方便地看到目前系統中已經被載入的外掛: 因此,Abacuza資料工廠應該可以滿足絕大部分大資料處理的業務場景。本身整個平臺都是基於.NET開發,並且通過NuGet分發了Abacuza SDK,因此擴充套件這些元件是非常簡單的,後面的演練部分可以看到詳細介紹。
部署拓撲
以下是Abacuza的部署拓撲:
(點選檢視大圖)
整個部署結構還是比較簡單的:5個主要的微服務由基於Ocelot實現的API Gateway負責代理,Ocelot可以整合IdentityServer4,在Gateway的層面完成使用者的認證(Gateway層面的授權暫未實現)。基於IdentityServer4實現的Identity Service並沒有部署在API Gateway的後端,因為在這個架構中,它的認證授權策略與一般的微服務不同。API Gateway、Identity Service以及基於Angular實現的web app都由nginx反向代理,向外界(客戶端瀏覽器)提供統一的訪問端點。所有的後端服務都執行在docker裡,並可以部署在Kubernetes中。
演練:在Abacuza上執行Word Count程式
Word Count是Spark官方推薦的第一個案例程式,它的任務是統計輸入檔案中每個單詞的出現次數。.NET for Spark也有一個相同的Word Count案例。在此,我仍然使用Word Count案例,介紹如何在Abacuza上執行資料處理程式。
先決條件
你需要一臺Windows、MacOS或者Linux的計算機,上面裝有.NET 5 SDK、docker以及docker-compose(如果是Windows或者MacOS,則安裝docker的桌面版),同時確保安裝了git客戶端命令列。
建立Word Count資料處理程式
首先使用dotnet命令列建立一個控制檯應用程式,然後新增相關的引用:
$ dotnet new console -f net5.0 -n WordCountApp
$ cd WordCountApp
$ dotnet add package Microsoft.Spark --version 1.0.0
$ dotnet add package Abacuza.JobRunners.Spark.SDK --prerelease
然後在專案中新加入一個class檔案,實現一個WordCountRunner類:
using Abacuza.JobRunners.Spark.SDK;
using Microsoft.Spark.Sql; namespace WordCountApp
{
public class WordCountRunner : SparkRunnerBase
{
public WordCountRunner(string[] args) : base(args)
{
} protected override DataFrame RunInternal(SparkSession sparkSession, DataFrame dataFrame)
=> dataFrame
.Select(Functions.Split(Functions.Col("value"), " ").Alias("words"))
.Select(Functions.Explode(Functions.Col("words"))
.Alias("word"))
.GroupBy("word")
.Count()
.OrderBy(Functions.Col("count").Desc());
}
}
接下來修改Program.cs檔案,在Main函式中呼叫WordCountRunner:
static void Main(string[] args)
{
new WordCountRunner(args).Run();
}
然後,在命令列中,WordCountApp.csproj所在的目錄下,使用下面的命令來生成基於Linux x64平臺的編譯輸出:
$ dotnet publish -c Release -f net5.0 -r linux-x64 -o published
最後,使用ZIP工具,將published下的所有檔案(不包括published目錄本身)全部打包成一個ZIP壓縮包。例如,在Linux下,可以使用下面的命令將published目錄下的所有檔案打成一個ZIP包:
$ zip -rj WordCountApp.zip published/.
Word Count程式已經寫好了,接下來我們就啟動Abacuza,並在其中執行這個WordCountApp。
執行Word Count程式
你可以使用git clone https://github.com/daxnet/abacuza.git命令,將Abacuza原始碼下載到本地,然後在Abacuza的根目錄下,使用下面的命令進行編譯:
$ docker-compose -f docker-compose.build.yaml build
編譯成功之後,用文字編輯器編輯template.env檔案,在裡面設定好本機的IP地址(不能使用localhost或者127.0.0.1,因為在容器環境中,localhost和127.0.0.1表示當前容器本身,而不是執行容器的主機),埠號可以預設:
然後,使用下面的命令啟動Abacuza:
$ docker-compose --env-file template.env up
啟動成功後,可以使用docker ps命令檢視正在執行的容器:
用瀏覽器訪問http://<你的IP地址>:9320,即可開啟Abacuza登入介面,輸入使用者名稱super,密碼P@ssw0rd完成登入,進入Dashboard(目前Dashboard還未完成)。然後在左側選單中,點選Cluster Connections,然後點選右上角的Add Connection按鈕:
在彈出的對話方塊中,輸入叢集連線的名稱和描述,叢集型別選擇spark,在設定欄中,輸入用於連線Spark叢集的JSON配置資訊。由於我們本地啟動的Spark在容器中,直接使用本機的IP地址即可,如果你的Spark叢集部署在其它機器上,也可以使用其它的IP地址。在配置完這些資訊後,點選Save按鈕儲存:
接下來就是建立任務執行器。在Abacuza管理介面,點選左邊的Job Runners選單,然後點選右上角的Add Job Runner按鈕:
在彈出的對話方塊中,輸入任務執行器的名稱和描述資訊,叢集型別選擇spark,之後當該任務執行器開始執行時,會挑選任意一個型別為spark的叢集來處理資料。
填入這些基本資訊後,點選Save按鈕,此時會進入任務執行器的詳細頁面,用來進行進一步的設定。在Payload template中,輸入以下JSON文字:
{
"file": "${jr:binaries:microsoft-spark-3-0_2.12-1.0.0.jar}",
"className": "org.apache.spark.deploy.dotnet.DotnetRunner",
"args": [
"${jr:binaries:WordCountApp.zip}",
"WordCountApp",
"${proj:input-defs}",
"${proj:output-defs}",
"${proj:context}"
]
}
大概介紹一下每個引數:
- file:指定了在Spark叢集上需要執行的程式所在的JAR包,這裡直接使用微軟的Spark JAR
- className:指定了需要執行的程式在JAR包中的名稱,這裡固定使用org.apache.spark.deploy.dotnet.DotnetRunner
- ${jr:binaries:WordCountApp.zip} 表示由className指定的DotnetRunner會呼叫當前任務執行器中的二進位制檔案WordCountApp.zip中的程式來執行資料處理任務
- WordCountApp 為ZIP包中可執行程式的名稱
- ${proj:input-defs} 表示輸入檔案及其配置將引用當前執行資料處理的專案中的輸入端點的定義
- ${proj:output-defs} 表示輸出檔案及其配置將引用當前執行資料處理的專案中的輸出端點的定義
- ${proj:context} 表示Spark會從當前專案讀入相關資訊並將其傳遞給任務執行器
在上面的配置中,引用了兩個binary檔案:microsoft-spark-3-0_2.12-1.0.0.jar和WordCountApp.zip。於是,我們需要將這兩個檔案上傳到任務執行器中。仍然在任務執行器的編輯介面,在Binaries列表中,點選加號按鈕,將這兩個檔案附加到任務執行器上。注意:microsoft-spark-3-0_2.12-1.0.0.jar檔案位於上文用到的published目錄中,而WordCountApp.zip則是在上文中生成的ZIP壓縮包。
配置完成後,點選Save & Close按鈕,儲存任務執行器。 接下來,建立一個數據處理專案,在左邊的選單中,點選Projects,然後在右上角點選Add Project按鈕:
在彈出的Add Project對話方塊中,輸入專案的名稱、描述,然後選擇輸入端點和輸出端點,以及負責處理該專案資料的任務執行器:
在此,我們將輸入端點設定為文字檔案(Text Files),輸出端點設定為控制檯(Console),也就是直接輸出到日誌中。這些配置在後續的專案編輯頁面中也是可以更改的。一個專案可以包含多個輸入端點,但是隻能有一個輸出端點。點選Save按鈕儲存設定,此時Abacuza會開啟專案的詳細頁,在INPUT選項卡下,新增需要統計單詞出現次數的文字檔案:
在OUTPUT選項卡下,確認輸出端點設定為Console:
然後點選右上角或者右下角的Submit按鈕,提交資料處理任務,此時,選項卡會自動切換到REVISIONS,並且更新Job的狀態:
稍等片刻,如果資料處理成功,Job Status會從RUNNING變為COMPLETED:
點選Actions欄中的檔案按鈕,即可檢視資料處理的日誌輸出:
從日誌檔案中可以看到,Abacuza已經根據我們寫的資料處理程式,統計出輸入檔案input.txt中每個單詞的出現次數。通過容器的日誌輸出也能看到同樣的資訊:
總結
本文介紹了自己純手工打造的資料工廠(Data Factory)的設計與實現,並開發了一個案例來演示該資料工廠完成資料處理的整個過程。之後還有很多功能可以完善:Dashboard、認證授權的優化、使用者與組的管理、第三方IdP的整合、Pipeline的實現等等,今後有空再慢慢弄吧。
歡迎訪問本人的個人站點https://sunnycoding.cn,獲得更好的閱讀體驗。