導讀:資料匯流排DBus的總體架構中主要包括六大模組,分別是:日誌抓取模組、增量轉換模組、全量抽取程式、日誌運算元處理模組、心跳監控模組、Web管理模組。六大模組各自的功能相互連線,構成DBus的工作原理:通過讀取RDBMS增量日誌的方式來實時獲取增量資料日誌(支援全量拉取);基於Logstash,flume,filebeat等抓取工具來實時獲得資料,以視覺化的方式對資料進行結構化輸出。本文主要介紹的是DBus中基於視覺化配置的日誌結構化轉換實現的部分。

一、結構化日誌的原理

1.1 源端日誌抓取

DBus可以對接多種log資料來源,例如:Logstash、Flume、Filebeat等。上述元件都是業界比較流行的日誌抓取工具,一方面便於使用者和業界統一標準,方便使用者技術方案的整合;另一方面也避免了無謂的重複造輪子。抓取的資料我們稱為原始資料日誌(raw data log),由抓取元件將其寫入Kafka中,等待DBus後續處理。

1.2 視覺化配置規則,使日誌結構化

使用者可自定義配置日誌源和目標端。同一個日誌源的資料可以輸出到多個目標端。每一條“日誌源-目標端”線,使用者可以根據自己的需要來配置相應的過濾規則。經過規則運算元處理後的日誌是結構化的,即:有schema約束,類似於資料庫中的表。

1.3 規則運算元

DBus設計了豐富易用的運算元,用於對資料進行定製化操作。使用者對資料的處理可分為多個步驟進行,每個步驟的資料處理結果可即時檢視、驗證;並且可重複使用不同運算元,直到轉換、裁剪出自己需要的資料。

1.4 執行引擎

將配置好的規則運算元組應用到執行引擎中,對目標日誌資料進行預處理,形成結構化資料,輸出到Kafka,供下游資料使用方使用。系統流程圖如下所示:

根據DBus log設計原則,同一條原始日誌,可以被提取到一個或多個表中。每個表是結構化的,滿足相同的schema約束。

  • 每個表是一個規則運算元組的集合,每個表可以擁有1個或多個規則運算元組;
  • 每個規則運算元組,由一組規則運算元組合而成,每個運算元具有獨立性;

對於任意一條原始資料日誌(raw data log),它應該屬於哪張表呢?

假如使用者定義了若干張邏輯表(T1,T2…),用於抽取不同型別的日誌,那麼,每條日誌需要與規則運算元組進行匹配:

  • 進入某張表T1的所有規則運算元組的執行過程
  • 符合條件的進入規則運算元組,並且被執行引擎轉換為結構化的表資料
  • 不符合提取條件的日誌嘗試下一個規則運算元組
  • 對於T1的所有規則運算元組,如果都不滿足要求,則進入下一張表T2的執行過程,以此類推
  • 如果該條日誌不符合任何一張表的過濾規則,則進入_unknown_table_表

例如,對於同一條應用日誌,其可能屬於不止一個規則組或Table,而在我們定義的規則組或Table中,只要其滿足過濾條件,該應用日誌就可以被規則組提取,即保證了同一條應用日誌可以同屬於不同的規則組或Table。

規則運算元是對資料進行過濾、加工、轉換的基本單元。常見的規則運算元如上圖所示。

運算元之間具有獨立性,運算元之間可以任意組合使用,從而可以實現許多複雜的、高階的功能,通過對運算元進行迭代使用,最終可以實現對任意資料進行加工的目的。使用者可以開發自定義運算元,運算元的開發非常容易,使用者只要遵循基本介面原則,就可以開發任意的運算元。

二、DBus日誌處理例項

以DBus叢集環境為例,DBus叢集中有兩臺機器(即master-slave)部署了心跳程式,用於監控、統計、預警等,心跳程式會產生一些應用日誌,這些應用日誌中包含各類事件資訊,假如我們想要對這些日誌進行分類處理並結構化到資料庫中,我們就可以採用DBus log程式對日誌進行處理。

DBus可以接入多種資料來源(Logstash、Flume、Filebeat等),此處以Logstash為例來說明如何接入DBus的監控和報警日誌資料。

由於在dbus-n2和dbus-n3兩臺機器上分別存在監控和預警日誌,為此我們分別在兩臺機器上部署了Logstash程式。心跳資料由Logstash自帶的心跳外掛產生,其作用是便於DBus對資料進行統計和輸出,以及對源端日誌抽取端(此處為Logstash)進行預警(對於Flume和Filebeat來說,因為它們沒有心跳外掛,所以需要額外為其定時產生心跳資料)。Logstash程式寫入到Kafka中的資料中既有普通格式的資料,同時也有心跳資料。

這裡不只是侷限於2臺部署有Logstash程式的機器,DBus對Logstash數量不做限制,比如應用日誌分佈在幾十上百臺機器上,只需要在每臺機器上部署Logstash程式,並將資料統一抽取到同一個Kafka Topic中,DBus就能夠對所有主機的資料進行資料處理、監控、預警、統計等。

2.1 啟動Logstash

在啟動Logstash程式後,我們就可以從topic : heartbeat_log_logstash中讀取資料,資料樣例如下:

1)心跳資料

2)普通日誌資料

2.2 配置規則

接下來,我們只需要在DBus Web中配置相應的規則就可以對資料進行處理了。

首先新建一個邏輯表sink_info_table,該表用來抽取sink事件的日誌資訊,然後配置該表的規則組(一個或多個,但所有的規則組過濾後的資料需要滿足相同schema特性),heartbeat_log_logstash作為原始資料topic,我們可以實時的對資料進行視覺化操作配置(所見即所得,即席驗證)。

1)讀取原始資料日誌

可以看到由Logstash預先提取已經包含了log4j的基本資訊,例如path、@timestamp、level等。但是資料日誌的詳細資訊在欄位log中。由於不同的資料日誌輸出是不一樣的,因此可以看到log列資料是不同的。

2)提取感興趣的列

假如我們對timestamp、log 等原始資訊感興趣,那麼可以新增一個toIndex運算元,來提取這些欄位:

這裡需要指出,我們考慮使用陣列下標方式,是有原因的: - 並不是所有列本身自帶列名(例如flume抽取的原始資料,或者split運算元處理後的資料列); - 下標方式可以使用陣列方式指定列(類似python方式, 例如:1:3表示1,2列); 因此後續操作全部基於陣列下標方式訪問。

執行規則,就可以看到被提取後的欄位情況:

3)過濾需要的資料

在這個例子中,我們只對含有“Sink to influxdb OK!”的資料感興趣。因此新增一個filter運算元,提取第7列中包含”Sink to influxdb OK!”內容的行資料:

執行後,只有符合條件的日誌行資料才會存在。

4)對特定列進行提取

新增一個select運算元,我們對第1和3列的內容感興趣,所以對這兩列進行提取。

執行select運算元,資料中就會只含有第1和3列了。

5)以正則表示式的方式處理資料

我們想從第1列的資料中提取符合特定正則表示式的值,使用regexExtract運算元對資料進行過濾。正則表示式如下:http_code=(\d*).*type=(.*),ds=(.*),schema=(.*),table=(.*)\s.*errorCount=(\d*),使用者可以寫自定義的正則表示式。

執行後,就會獲取正則表示式執行後的資料。

6)選擇輸出列

最後我們把感興趣的列進行輸出,使用saveAs運算元, 指定列名和型別,方便於儲存在關係型資料庫中。

執行saveAs運算元後,這就是處理好的最終輸出資料樣本。

2.3 檢視結構化輸出結果

儲存上一步配置好的規則組,日誌資料經過DBus執行運算元引擎,就可以生成相應的結構化資料了。目前根據專案實際,DBus輸出的資料是UMS格式,如果不想使用UMS,可以經過簡單的開發,實現定製化。

注:UMS是DBus定義並使用的、通用的資料交換格式,是標準的JSON。其中同時包含了schema和資料資訊。更多UMS介紹請參考DBus開源專案主頁的介紹。開源地址:https://github.com/bridata/dbus

以下是測試案例,輸出的結構化UMS資料的樣例:

2.4 日誌監控

為了便於掌握資料抽取、規則匹配、監控預警等情況,我們提供了日誌資料抽取的視覺化實時監控介面,如下圖所示,可隨時瞭解以下資訊:

  • 實時資料條數
  • 錯誤條數情況(錯誤條數是指:執行運算元時出現錯誤的情況,幫助發現運算元與資料是否匹配,用於修改運算元,DBus同時也提供了日誌回讀的功能,以免丟失部分資料)
  • 資料延時情況
  • 日誌抽取端是否正常

監控資訊中包含了來自叢集內各臺主機的監控資訊,以主機IP(或域名)對資料分別進行監控、統計和預警等。

監控中還有一張表叫做_unkown_table_ 表明所有沒有被匹配上的資料條數。例如:Logstash抓取的日誌中有5種不同事件的日誌資料,我們只捕獲了其中3種事件,其它沒有被匹配上的資料,全部在_unkown_table_計數中。

DBus同樣可以接入Flume、Filebeat、UMS等資料來源,只需要稍作配置,就可以實現類似於對Logstash資料來源同樣的處理效果,更多關於DBus對log的處理說明,請參考:

  • https://bridata.github.io/DBus/install-logstash-source.html

  • https://bridata.github.io/DBus/install-flume-source.html

  • https://bridata.github.io/DBus/install-filebeat-source.html

應用日誌經過DBus處理後,將原始資料日誌轉換為了結構化資料,輸出到Kafka中提供給下游資料使用方進行使用,比如通過Wormhole將資料落入資料庫等。具體如何將DBus與Wormhole結合起來使用,請參考:如何設計實時資料平臺(技術篇)。

作者:仲