1. 程式人生 > >Hawk原理:通過IEnumerable實現通用的ETL管道

Hawk原理:通過IEnumerable實現通用的ETL管道

工作 文本 概念 分號 回收 簡單的 思想 單表 轉換

  針對IEnumerable已經有多篇文章,本篇介紹如何使用IEnumerable實現ETL. ETL,是英文 Extract-Transform-Load 的縮寫,用來描述將數據從來源端經過萃取(extract)、轉置(transform)、加載(load)至目的端的過程。通常來說,從原始端采集的數據有很多問題,同時可能業務需求與采集的數據格式不相匹配,所以就必須實現ETL過程。

  ETL可以理解為一條清洗管線,數據從一端流入,從另一端流出。數據量可能很大,所以管線不大可能也沒有必要加載全部內容。同時,一般情況下,從管線流出來的數據會進入新的數據池,很少直接修改到原表。

  從管線的概念可以看出,ETL需要構造可組合的鏈條,首先實現一組組件,然後實現可將這些組件組裝為一條ETL管線的框架。IEnumerable一大堆的LINQ擴展,正好幫我們實現了這一思想。

1. 數據的表達

  我們先討論清楚如何表達數據,因為數據處理涉及到動態增減屬性的問題,因此一般的實體類是做不到的,我們采用字典來實現。為此我包裝了一個實現IDictionary<string, object>的類。叫做FreeDocument。它可以簡單表示如下:

  /// <summary>
    /// 自由格式文檔
    /// </summary>
    public interface IFreeDocument : IDictionarySerializable, IDictionary<string, object>, IComparable
    {
        
#region Properties IDictionary<string, object> DataItems { get; set; } IEnumerable<string> PropertyNames { get; } #endregion }

  因此數據的處理,本質就是對每一個字典對象中的鍵值對進行增刪改查。

2 .基本組件

  數據清洗組件的基接口是ICollumProcess. 定義如下:

 public interface ICollumProcess : IDictionarySerializable
    {

        
string CollumName { get; set; } //針對的列名 bool ShouldCalculated { get; set; } //是否需要重新計算 double Priority { get; set; } //優先級 void Finish(); //處理完成時的回收函數 void Init(IList<IFreeDocument> datas); //對數據進行初始化的探測行為 }

  更清晰的說,其實派生出四部分:

  (1) 生成器

  生成器即提供/產生數據的組件。這可能包括生成一個從0-1000的數,獲取某個數據表中的數據,或從網頁檢索的結果。它的接口可以表示如下:

[Interface("ICollumGenerator", "數據生成器", SearchStrategy.FolderSearch)]
    public interface ICollumGenerator : ICollumProcess
      {

          /// <summary>
          /// 當前叠代的位置
          /// </summary>
          int Position { get; set; }
          IEnumerable<FreeDocument> Generate();/// <summary>
          /// 生成器能生成的文檔數量
          /// </summary>
          /// <returns></returns>
          int? GenerateCount();
      }

  最主要的方法是Generate,它能夠枚舉出一組數據出來,同時還有可能(有時做不到)得到能夠生成文檔的總數量。

  (2)過濾器

  過濾器即能夠分析一個文檔是否滿足條件,不滿足則剔除的組件。接口也很簡單:

  

  [Interface("ICollumDataFilter", "數據列過濾器", SearchStrategy.FolderSearch)]
     public interface ICollumDataFilter :  ICollumProcess
    {
        bool FilteData(IFreeDocument data);
     
    }

  (3)排序器

  顧名思義,對數據實現排序的接口,定義如下:

  [Interface("ICollumDataSorter", "數據排序器", SearchStrategy.FolderSearch)]
    public interface ICollumDataSorter : IDictionarySerializable, ICollumProcess,IComparer<object>
    {
       
       
         SortType SortType { get; set; }

         IEnumerable<IFreeDocument> Sort(IEnumerable<IFreeDocument> data);
    }

   排序一般需要升序和降序,但排序最大的問題是破壞了管線的單向流動性和虛擬性。最少LINQ的標準實現上,排序是內存排序,因此必須把數據全部加載進來才能排序,這嚴重影響了性能。因此目前的排序最好在小數據的情況下進行。

  (4)列轉換器

  它最重要的組件。整個ETL過程,實質上就是不同的列進行變換,組成另外一些列的過程(列就是鍵值對)。 定義實現如下:

[Interface("ICollumDataTransformer", "數據轉換器", SearchStrategy.FolderSearch)]
    public interface ICollumDataTransformer : ICollumProcess
    {
        string NewCollumName { get; set; }
        SimpleDataType TargetDataType { get; set; }
        ObservableCollection<ICollumDataFilter> FilterLogics { get; set; }
        object TransformData(IFreeDocument datas);
        IEnumerable<string> AffectedCollums { get; }
    }

  看著很復雜,但其實就是將文檔中的一些列轉換為另外一些列。比如對一個字符串的列進行正則替換,或轉換其數據類型(如從string變成int)。舉個最簡單的HTML編解碼的例子:

   public override object TransformData(IFreeDocument document)
        {
            object item = document[CollumName];
            if (item == null)
                return "";
            switch (ConvertType)
            {
                case ConvertType.Decode:
                    return HttpUtility.HtmlDecode(item.ToString());
                    break;
                case ConvertType.Encode:
                    return HttpUtility.HtmlEncode(item.ToString());
                    break;
            }
            return "";
        }

  

3. ETL管線的設計

   相信你已經想到,ETL管線的核心就是動態組裝的LINQ了。

   一個最基本的ETL管理類,應當具有以下的屬性:

   public ObservableCollection<ICollumProcess> CurrentETLTools { get; set; } //當前已經加載的ETL工具

   protected List<Type> AllETLTools { get; set; } //所有能夠使用的ETL工具。當然Type只是此處為了方便理解而設定的,更合適的應該是記錄了組件元數據,名字和介紹的擴展類。

   以及一個方法:

   public IEnumerable<IFreeDocument> RefreshDatas(IEnumerable<IFreeDocument> docuts) //從原始數據轉換為新的數據

   那麽,這個函數的實現可以如下定義:

  public IEnumerable<IFreeDocument> RefreshDatas(IEnumerable<IDictionarySerializable> docuts)
        {
            if (SampleMount <= 0)
            {
                SampleMount = 10;
            }

            IEnumerable<IFreeDocument> ienumable = docuts.Where(d=>d!=null).Select(d => d.DictSerialize());
            Errorlogs = new List<ErrorLog>();
         
            List<IFreeDocument> samples = docuts.Take((int) SampleMount).Select(d => d as IFreeDocument).ToList();
            foreach (ICollumProcess tool in
                CurrentETLTools.Where(d => d.ShouldCalculated).OrderByDescending(d => d.Priority))
            {
                tool.SourceCollection = CurrentCollection;

                tool.Init(samples);

                if (tool is ICollumDataTransformer)
                {
                    var ge = tool as ICollumDataTransformer;

                    ienumable = Transform(ge, ienumable);
                }
                if (tool is ICollumGenerator)
                {
                    var ge = tool as ICollumGenerator;
                    if (!ge.CanAppend) //直接拼接
                        ienumable = ienumable.Concat(ge.Generate());
                    else
                    {
                        ienumable = ienumable.MergeAll(ge.Generate());
                    }
                }

                else if (tool is ICollumDataFilter)
                {
                    var t = tool as ICollumDataFilter;
                    ienumable = ienumable.Where(t.FilteData);
                }
                else if (tool is ICollumDataSorter)
                {
                    var s = tool as ICollumDataSorter;

                    switch (s.SortType)
                    {
                        case SortType.AscendSort:
                            ienumable = ienumable.OrderBy(d => d, s);
                            break;
                        case SortType.DescendSort:
                            ienumable = ienumable.OrderByDescending(d => d, s);
                            break;
                    }
                }

                tool.Finish();
            }
            return ienumable;
        }

   基本實現思路如上。即通過優先級排序所有加載的ETL組件,並提取一部分樣例數據,為組件進行一次初始化。然後通過組裝不同的轉換器,生成器,排序器和過濾器,最後即可組裝為一個新的ienumable對象。註意整個過程都是延遲計算的,只有在真正需要ETL結果時才會進行實質性的操作。

 4. 優化ETL管線和實現虛擬視圖

  以上就是ETL的基本思路。但是僅僅做到這些是很不夠的。以下才是這篇文章的核心。

  ETL管線破壞了原有集合的特性,原有集合可能是能夠支持索引查詢甚至能夠執行高性能查找的。但ETL將其退化為僅能夠枚舉。枚舉意味著只能從頭訪問到尾,不能回退和索引。要想使用新集合,就只能訪問其前n個元素,或者全部訪問。這顯然對一些操作是很不利的。

  先考慮索引器。如果能滿足以下條件:

  (1) 管線中不包括排序器和過濾器,因為它們使得得集合產生了亂序。

  (2) 原始集合能夠支持索引器

  (3) 使用的生成器能夠提供生成的大小,同時生成器也能夠實現索引器

  (4) 轉換器應當只實現1到1轉換,沒有額外的副作用。

   那麽原始集合和新集合元素的對應關系是可計算的。此時索引器就能發揮作用。在實際使用中,轉換器是用的最多的。條件不可謂不苛刻。

  關於高性能查找,我們先不考慮針對復雜的SQL查詢,先考慮那種最簡單的find(item[key]==value)的查詢。但這個條件更加苛刻:

  (1) key在原始集合中必須支持高性能查找

  (2) 滿足上述索引器的四個條件

  (3) 針對key這一列的操作,轉換器必須是可逆的。而且最好能實現1-1映射。

  所謂可逆的意思就是說,轉換器能從A轉換為B,同時也能通過結果B反推出結果A。 但這種條件何其苛刻!a*5=b,這樣的操作是可逆的,然而正則轉換,替換以及絕大多數的運算都是不可逆的。

  怎麽辦呢?可能的做法,就是轉換器在轉換過程中,就動態地將key的轉換結果保存下來。於是,對新集合的查找操作,最後就能一步步回退到原始集合的查找操作。還有更好的辦法麽?

  如何讓新集合應對復雜的SQL查詢?首先需要解析SQL, 這可能涉及到大量的數學推導和轉換。以至於在實現當中因為限制太多,基本上不可能實現。以篩選key為一定範圍的數據為例,每次都需要逆向推導,這種推導難度非常大。

 

5. 智能ETL和用戶體驗優化

  整個ETL過程,是人為觀察數據的特性,組合和配置不同的ETL組件,這一過程能夠實現自動化嗎?

  人是很智能的,它能夠觀察不同數據的格式和類型,發現其中的特征,比如以下數據:

高樓層/21層,南垡頭翠成馨園,2004年建,塔樓
中樓層/5層,南北豆各莊5號院,2003年建,板樓

  人通過觀察這麽兩行的數據,就可以大概的判斷出這些信息分別代表的是什麽意思,以及如何去分割和轉換。可以用正則,提取第一個出現的數字,即樓層,再使用\d{4}提取年份,而用逗號分割,即可得到小區名稱。

  但是,這個操作依舊需要最少懂得一定程序基礎的人來參與,如果用機器來做的話,又該如何做呢?自動化步驟可以分為兩個層次:

  (1) 自動分割和對齊。

  數據尤其是來自web的數據,由於本身是由程序生成的,因此在格式上有高度的統一性,同時分隔符也是類似的,包括逗號,分號,空格,斜杠等。因此,可以統計不同分割符出現的次數,以及對應的位置,通過概率模型,生成最可能的分割方案,使得每一條數據分割出來的長度和子項數量盡可能一致。

  (2) 自動識別內容

  自動識別內容可以依賴於規則或者識別器。一種比較可靠的方法是通過基於正則的文本規則,構造一組規則組。通常200x這樣的數值,很容易被理解為年份,而12:32這樣的結構,則很容易被識別為時間。通過基於結構的識別引擎,不僅能夠識別”這是什麽內容“,更能提出其元數據,比如日期中的日月年等信息,為之後的工作做準備。

  Web表格最大的好處,在於它的格式一致性。只要分析很少的具有代表性的樣例數據,就能夠掌握整個數據集的特征。因此完全可以用比較大的代價獲得一個盡可能高的識別模塊,而在執行過程中盡量提升性能。

  

Hawk原理:通過IEnumerable實現通用的ETL管道