1. 程式人生 > >一個一站式流式處理雲平臺解決方案

一個一站式流式處理雲平臺解決方案

隨著網際網路、IT、大資料等技術的爆發式發展,企業系統產生的大量爆發資料。對於儲存在資料庫中的業務資料,可以通過DBus資料匯流排+Wormhole流式處理平臺的日誌方式實時地無侵入同步和落地到任意sink端,提供下游系統分析使用;對於業務系統產生的日誌資料,這些包含了業務高低峰、使用者軌跡、系統異常/錯誤資訊、呼叫鏈等諸多資訊,也蘊含著無價的寶藏。一些公司通過埋點等方式和手段,往日誌資料裡輸出他們想要監控和跟蹤的資訊,以便提供客觀的資料支撐,做出更高效、更準確的決策。

DBus-https://github.com/bridata/dbus

Wormhole-https://github.com/edp963/wormhole

在這種背景下,各種各樣的日誌收集、結構化、分析工具如雨後春筍般出現,業界目前已有不少發展得比較成熟的方案,例如:Logstash、Filebeat、Flume、Fluentd、Chukwa. scribe、Splunk等,更有整合了日誌採集、轉換分析和展示的ELK整合方案。這些方案各有所長。在結構化日誌這個方面,大多采用配置正則表達的方式:用於提取日誌中模式比較固定、通用的部分,例如日誌時間、日誌型別、行號等。對於真正的和業務比較相關的資訊,我們權且稱之為message部分,原因主要是這部分千變萬化,很難有一個特定、通用的模式來一勞永逸地描述及囊括所有情形。然而,這部分內容對於企業來說,比前面相對固定的部分更有價值。

本文就是介紹一種通過簡單、視覺化、所見即所得即席驗證的方式,為使用者提供高效、靈活地採集和加工利用資料日誌的工具。

常見的日誌處理方案

目前業界常見的日誌分析代表為ELK:

  1. 通過Logstash實時收集和轉換資料,儲存到Eleaticsearch中,通過Kibana進行展示。

  2. 或者通過Filebeat 作為前端抓取資料,Logstash進行資料轉換,儲存到Eleaticsearch中,通過Kibana進行展示。

                                             

 

 

 

結構化日誌的原理

DBUS設計的資料日誌同步方案如下:

 

  1. 日誌抓取端採用業界流行的元件(例如Logstash、Flume、Filebeat等)。一方面便於使用者和業界統一標準,方便使用者的整合;另一方面也避免無謂的重造輪子。抓取資料稱為原始資料日誌(raw data log)放進Kafka中,等待處理。

  2. 提供視覺化介面,配置規則來結構化日誌。使用者可配置日誌來源和目標。同一個日誌來源可以輸出到多個目標。每一條“日誌源-目標”線,中間資料經過的規則處理使用者根據自己的需求來自由定義。最終輸出的資料是結構化的,即:有schema約束,可以理解為類似資料庫中的表。

  3. 所謂規則,在DBUS中,即“規則運算元”。DBUS設計了豐富易用的過濾、拆分、合併、替換等運算元供使用者使用。使用者對資料的處理可分多個步驟進行,每個步驟的資料處理結果可即時檢視、驗證;可重複使用不同運算元,直到轉換、裁剪得到自己需要的資料。

  4. 將配置好的規則運算元組運用到執行引擎中,對目標日誌資料進行預處理,形成結構化資料,輸出到Kafka,供下游資料使用方使用。

 

系統流程圖如下所示:

 

 

 

根據配置,我們支援同一條原始日誌,能提取為一個表資料,或者可以提取為多個表資料。

 

每個表是結構化的,滿足相同的schema。

  • 每個表是一個規則 運算元組的合集,可以配置1個到多個規則運算元組

  • 每個規則運算元組,由一組規則運算元組合而成

 

拿到一條原始資料日誌, 它最終應該屬於哪張表呢?

 

每條日誌需要與規則運算元組進行匹配:

  • 符合條件的進入規則運算元組的,最終被規則組轉換為結構化的表資料。

  • 不符合的嘗試下一個規則運算元組。

  • 都不符合的,進入unknown_table表。

 

 

 

規則運算元

 

規則運算元是對資料進行過濾、加工、轉換的基本單元。常見的規則運算元如下:

 

運算元之間是獨立的,通過組合不同的運算元達到更復雜的功能,對運算元進行迭代使用最終達到對任意資料進行加工的目的。

我們試圖使得運算元儘量滿足正交性或易用性(雖然正則表示式很強大,但我們仍然開發一些簡單運算元例如trim運算元來完成簡單功能,以滿足易用性),運算元的開發也可以隨意擴充,比如可以開發提取JSON節點值或XML節點值的運算元。運算元的開發也很容易,只要遵循基本介面原則,就可以開發任意自定義運算元。

 

結構化日誌的例子

下面以提取heart_beat_status表為例子進行配置說明,整個資料日誌結構化過程如下:

使用通用抓取端抓取原始日誌

 

使用Logstash 讀取log4j檔案作為資料輸入源,輸出到Kafka中(這裡具體配置就不說了,可參考Logstash配置)

這裡抓取端不限制,你可以使用Flume、Filebeat,甚至你自己寫的端,只要輸出到Kafka中就可以。

使用視覺化方式進行規則運算元的配置

 

首先配置一個輸出表的規則組, 檢視heartbeat_log_new這個topic中,我們直接實時的視覺化操作配置。

 

 

1、讀取原始資料日誌

 

可以看到由Logstash 預先提取已經包含了log4j的基本資訊,例如path、@timestamp、level等。但是資料日誌的詳細資訊在欄位log中。由於不同的資料日誌輸出是不一樣的,因此可以看到log列資料是同的。

 

需要指出的是:使用logstash預先提取其它列是可選的,其實對於Flume這樣直接抓取到的就是raw data log,這對我們後面的提取沒有影響

 

 

2、提取感興趣列

例如我們提取timestamp、log 原始資訊等,可以新增一個toindex運算元,提取感興趣的欄位,如下:

 

這裡需要指出,我們考慮使用陣列下標方式,是有原因的:

  • 並不是所有列天生就有列名(例如flume抽取的原始資料,或者split運算元處理後的資料列);

  • 下表方式對可以使用陣列方式指定列(類似python方式, 例如:1:3 表示1,2,3列);

因此後續操作全部基於陣列下標方式訪問。

特別說明一下:如下?號所在的地方,滑鼠移上去就會顯示一個線上幫助,告訴你這個運算元怎麼使用,每個運算元怎麼用不需要記。

 

執行一下,就可以看到被提取後的欄位情況:

3、過濾不相關資料

在這個例子中,我們只對插入心跳包的資料感興趣。因此新增一個filter運算元,對第2列進行過濾”插入心跳包”:

 

執行後,不符合條件的日誌行就被過濾了。

 

4、以切分方式進行提取

新增一個split運算元,我們對“資料來源”,“插入心跳包”,還有後面的“node”進行切分。

 

可以看到切分後,原來的第1列變新的1,2,3,4,5列了。

特別需要說明的是:提取的方式非常多,Split只是一種常見方式。我們可以substring提取,replace掉不需要的資料等,我們還也可以配置正則表示式提取。我們不積極推薦用正則表示的原因是正則表示式很容易寫錯,並不是最簡單視覺化的選擇。

 

5、以trim方式出來資料

我們想提取4列的值,使用trim運算元進行過濾掉不需要的資料。

 

 

執行後,這樣新的5列就拿到乾淨的值。

6、選擇輸出列

最後我們把感興趣的列輸出,使用saveAs運算元進行輸出, 指定列名和型別。

 

 

執行後,這就是處理好的最終輸出資料樣本。

檢視結構化輸出結果

儲存上一步配置好的規則組,運用到DBus執行運算元引擎,就可以生成相應的結構化資料了。目前根據專案實際,DBus輸出的資料是UMS格式。UMS是DBus開源專案(https://github.com/bridata/dbus)定義並使用的,通用的資料交換格式,是標準的JSON。其中同時包含了schema和資料資訊。更多UMS介紹請參考DBus開源專案主頁的介紹。

輸出結果的資料格式和結構,不想使用UMS的,可經過簡單的開發,實現定製化。

 

以下是測試案例,輸出的結構化UMS資料的例子:

{

    "payload": [

        {

            "tuple": [

                "127046516736228867",

                "2017-12-17 13:57:30.000",

                "i",

                "320171788",

                "2017/12/17 13:57:30.877",

                "edpdb",

                "成功",

                "/DBus/HeartBeat/Monitor/edpdb/TEST1/T1000"

            ]

        },

        {

            "tuple": [

                "127046516736228869",

                "2017-12-17 13:57:30.000",

                "i",

                "320171790",

                "2017/12/17 13:57:30.946",

                "edpdb",

                "成功",

                "/DBus/HeartBeat/Monitor/edpdb/TEST4/ONEYI"

            ]

        },

        {

            "tuple": [

                "127046520930532871",

                "2017-12-17 13:57:31.000",

                "i",

                "320171792",

                "2017/12/17 13:57:31.026",

                "edpdb",

                "成功",

                "/DBus/HeartBeat/Monitor/edpdb/TEST3/USER_REGISTER"

            ]

        }

    ],

    "protocol": {

        "type": "data_increment_data",

        "version": "1.3"

    },

    "schema": {

        "batchId": 0,

        "fields": [

            {

                "encoded": false,

                "name": "ums_id_",

                "nullable": false,

                "type": "long"

            },

            {

                "encoded": false,

                "name": "ums_ts_",

                "nullable": false,

                "type": "datetime"

            },

            {

                "encoded": false,

                "name": "ums_op_",

                "nullable": false,

                "type": "string"

            },

            {

                "encoded": false,

                "name": "ums_uid_",

                "nullable": false,

                "type": "string"

            },

            {

                "encoded": false,

                "name": "event_time",

                "nullable": false,

                "type": "datetime"

            },

            {

                "encoded": false,

                "name": "datasource",

                "nullable": false,

                "type": "string"

            },

            {

                "encoded": false,

                "name": "heartbeat_state",

                "nullable": false,

                "type": "string"

            },

            {

                "encoded": false,

                "name": "heartbeat_node",

                "nullable": false,

                "type": "string"

            }

        ],

        "namespace": "heartbeat_log.heartbeat_log_schema.heartbeat_table.3.host1.0"

    }

}

檢視監控

 

為了便於掌握資料抽取及規則匹配等情況,我們提供了日誌資料提取的視覺化實時監控介面,如下圖所示,可隨時瞭解:

  • 實時資料條數

  • 錯誤條數情況(錯誤條數是指:執行運算元時出現錯誤的情況,幫助發現運算元與資料是否匹配,用於修改運算元)

  • 資料延時情況

 

監控中還有一張表叫做__unkown_table__ 表明所有沒有被匹配上的資料條數。例如:logstash抓取的日誌中有5種不同模式的資料,我們只捕獲了其中3種模式,其它沒有被匹配上的資料,全部在_unkown_table_計數中。

總結

DBus日誌同步方案總結如下:

1、整合整合現有日誌抓取工具(Flume、filebeat、logstash等),方便使用者接入及整合;

2、DBUS提供了豐富的運算元及視覺化配置介面,供使用者結構化資料日誌使用。

  • 通過視覺化的,自由地使用各種規則運算元對日誌進行處理,生成結構化的資料日誌;

  • 每個運算元配置過程都可以看到原始資料和加工後的資料;

  • 運算元可以隨意增減,處理順序可以隨意調換,每個步驟的運算元相互疊加使用;

  • 使用者也可以擴充套件開發自己需要的運算元;

  • 提供源到目標一對多的配置,讓使用者想怎麼玩就怎麼玩,從而可從任意多個角度挖掘資料價值;

  • 監控讓使用者更直觀地瞭解到資料結構化實時轉換的情況,一切盡在掌控;

3、將原始資料日誌轉換為結構化資料,輸出到kafka中提供給下游資料使用費進行使用。

 

最終使得日誌業務資料結構化的過程,變得簡單、視覺化、配置集中化, 使得大家都能輕鬆地玩轉日誌資料。

 

本文提到的日誌結構化方案計劃於2018年1月釋出到開源專案DBus 0.4版本中,專案地址https://github.com/bridata/dbus

系統架構和工作原理

DBUS主要分為兩個部分:貼源資料採集和多租戶資料分發。兩個部分之間以Kafka為媒介進行銜接。無多租戶資源、資料隔離需求的使用者,可以直接消費源端資料採集這一級輸出到kafka的資料,無需再配置多租戶資料分發。

GlobalOverview

1 DBUS源端資料採集

DBUS源端資料採集大體來說分為2部分:

  • 讀取RDBMS增量日誌的方式來 實時獲取增量資料日誌,並支援全量拉取;
  • 基於logtash,flume,filebeat等抓取工具來實時獲得資料,以視覺化的方式對資料進行結構化輸出;

以下為具體實現原理 system arch

主要模組如下:

  • 日誌抓取模組:從RDBMS的備庫中讀取增量日誌,並實時同步到kafka中;
  • 增量轉換模組:將增量資料實時轉換為UMS資料,處理schema變更,脫敏等;
  • 全量抽取程式:將全量資料從RDBMS備庫拉取並轉換為UMS資料;
  • 日誌運算元處理模組:將來自不同抓取端的日誌資料按照運算元規則進行結構化處理;
  • 心跳監控模組:對於RDMS類源,定時向源端傳送心跳資料,並在末端進行監控,傳送預警通知;對於日誌類,直接在末端監控預警。
  • web管理模組:管理所有相關模組。

2 多租戶資料分發

對於不同租戶對不同源端資料有不同訪問許可權、脫敏需求的情形,需要引入Router分發模組,將源端貼源資料,根據配置好的許可權、使用者有權獲取的源端表、不同脫敏規則等,分發到分配給租戶的Topic。這一級的引入,在DBUS管理系統中,涉及到使用者管理、Sink管理、資源分配、脫敏配置等。不同專案消費分配給他的topic。

route2Project

主要功能:

  • 無侵入方式接入多種資料來源: 業務系統無需任何修改,以無侵入性讀取資料庫系統的日誌獲得增量資料實時變化。目前RDBMS支援mysql,oracle資料來源(Oracle資料來源請參考Oracle相關協議), 日誌方面支援基於logstash,flume和filebeat的多種資料日誌抽取方案。
  • 海量資料實時傳輸: 使用基於Storm的流式計算框架,秒級延時,整體無單點保證高可用性。
  • 多租戶支援: 提供使用者管理、資源分配、Topology管理、租戶表管理等豐富的功能,可根據需求,為不同租戶分配不同的源端表資料訪問許可權,應用不同的脫敏規則,從而實現多租戶資源隔離、差異化資料安全。

grafana

2intr_proj_table

2intr_router_topo

  • 感知源端schema變更: 當源端發生schema變更時,能自動感知schema變化,調整UMS版本號,並通過Kafka訊息和郵件通知下游diff

  • 資料實時脫敏: 可根據需求對指定列資料進行實時脫敏。脫敏策略包括:直接替換、MD5、murmur等脫敏演算法,脫敏加鹽,正則表示式替換等。支援使用者開發jar包實現DBUS未覆蓋的個性化脫敏策略。 docs/encode

  • 初始化載入: 支援高效的初始化載入和重新載入,支援任意指定輸出topic,靈活應對客戶需求。 docs/fuller

  • 統一標準化訊息傳輸協議: 使用統一的UMS(JSON格式)訊息schema格式輸出便於消費,提供資料線級ums_id保證資料順序性,輸出insert,Update(before/after),Delete event資料。 ums

  • 可靠多路訊息訂閱分發: 使用Kafka儲存和傳遞訊息保證可靠性和便捷的多使用者訂閱

  • 支援分割槽表/系列表資料彙集: 支援分割槽表的資料彙集到一個“邏輯表” 。也可將使用者自定義的系列表資料彙集到一個“邏輯表“。例:

    grafana

  • 實時監控&預警: 視覺化監控系統能隨時檢視各資料線實時流量和延時狀況;當資料線發生異常時,根據配置策略自動發郵件或簡訊通知相關負責人

    grafana

Wormhole  Architecture

設計理念

  • 統一 DAG 高階分形抽象
    • 構建由 Source DataSys,Kafka Topic,Spark Stream(Flink Stream),Sink DataSys 組成的物理 DAG
    • 每個物理 DAG 裡可以並行處理多個由 Source Namespace,Flow,Sink Namespace 組成的邏輯 DAG
    • 每個 Flow 本身是典型的 Spark RDD DAG
  • 統一通用流訊息 UMS 協議抽象
    • UMS 是 Wormhole 定義的流訊息協議規範
    • UMS 試圖抽象統一所有結構化訊息
    • UMS 自身攜帶結構化資料 Schema 資訊
    • Wh4 支援使用者自定義半結構化 JSON 格式
  • 統一資料邏輯表名稱空間 Namespace 抽象
    • Namespace 唯一定位所有資料儲存所有結構化邏輯表
    • [Data System].[Instance].[Database].[Table].[Table Version].[Database Partition].[Table Partition]

主要特性

  • 支援視覺化,配置化,SQL 化開發實施流式專案
  • 支援指令式動態流式處理的管理,運維,診斷和監控
  • 支援統一結構化 UMS 訊息和自定義半結構化 JSON 訊息
  • 支援處理增刪改三態事件訊息流
  • 支援單個物理流同時並行處理多個邏輯業務流
  • 支援流上 Lookup Anywhere,Pushdown Anywhere
  • 支援基於業務策略的事件時間戳流式處理
  • 支援 UDF 的註冊管理和動態載入
  • 支援多目標資料系統的併發冪等入庫
  • 支援多級基於增量訊息的資料質量管理
  • 支援基於增量訊息的流式處理和批量處理
  • 支援 Lambda 架構和 Kappa 架構
  • 支援與三方系統無縫整合,可作為三方系統的流控引擎
  • 支援私有云部署,安全許可權管控和多租戶資源管理