1. 程式人生 > >Flume資料採集結合etcd作為配置中心在爬蟲資料採集處理中的架構實踐。

Flume資料採集結合etcd作為配置中心在爬蟲資料採集處理中的架構實踐。

Apache Flume是一個分散式的、可靠的、可用的系統,用於有效地收集、 聚合和將大量日誌資料從許多不同的源移動到一個集中的資料儲存,但是其本身是以本地properties作為配置的,配置無法做到動態監聽和更新。

一、Flume和ETCD的結合,使用ETCD作為flume 資料採集的配置中心。

那麼如何做出一個flume的動態配置中心呢,etcd 可以是一個很好的選擇。etcd的API版本有v2和v3兩個,這裡選擇v3版本。在flume啟動的時候,可以啟動etcd的監聽。

...  
  @Override
    public void start() {
        //初始化監聽
        EtcdUtil.initListen(etcdConfig);
        sinkCounter.start();
        sinkCounter.incrementConnectionCreatedCount();
        super.start();
    }
...

 

   /**
     * etcd的監聽,監聽指定的key,當key 發生變化後,監聽自動感知到變化。 key發生變化後,會更新本地快取資料
     *
     * @param key 指定需要監聽的key
     */
    public static void initListen(String key) {
        try {

            //載入配置
            loadProperties(getConfig(EtcdUtil.getEtclClient().getKVClient().get(ByteSequence.fromString(key)).get().getKvs()));
            new Thread(() -> {
                Watch.Watcher watcher = EtcdUtil.getEtclClient().getWatchClient().watch(ByteSequence.fromString(key));
                try {
                    while (true) {
                        watcher.listen().getEvents().stream().forEach(watchEvent -> {
                            KeyValue kv = watchEvent.getKeyValue();
                            log.info("etcd event:{} ,change key is:{},afterChangeValue:{}", watchEvent.getEventType(), kv.getKey().toStringUtf8(), kv.getValue().toStringUtf8());
                            loadProperties(kv.getValue().toStringUtf8());
                        });
                    }
                } catch (InterruptedException e) {
                    log.error("etcd listen start cause Exception:{}", e);
                }
            }).start();
        } catch (Exception e) {
            log.error("etcd listen start cause Exception:{}", e);
        }
    }

  備註:完整的程式碼可以參考筆者部落格:https://www.cnblogs.com/laoqing/p/8967549.html

監聽完配置後,就可以在etcd 的配置中心中管理配置了

 

然後就可以通過如下程式碼獲取配置了

....
EtcdUtil.getLocalPropertie("xxxxx")
....

二、Flume 日誌採集中的流水線架構設計

flume 中資料採集是通過source->Sink的方式進行資料採集入庫的,但是有一個缺點就是資料中如果需要做一些ETL的業務處理,比如簡單的資料加工,或者增加一些業務邏輯處理等然後再入庫,無法滿足。而是我們就可以對flume原有的架構進行拓展。

拓展後的架構圖如下所示。

 

 

  • 1、使用者可以自定義process,繼承統一的process介面,使用者的process自己打成jar包。放到flume的lib目錄中。
    • public interface Processor<T> {
          T process(T log);
      }
  • 2、etcd動態配置中,配置需要使用哪些process,在多個process的時候,在etcd動態配置中配置順序。
    • processors=[{"processor":"com.xxx.flume.tax.processor.TaxCrawlerDataCommonProcessor","logType":"5"}] # logType代表日誌型別
    • public class ProcessorBean {
          private String processor;
          private String logType;
          private  Processor processorInstance;
      
          public Processor getProcessorInstance() {
              return processorInstance;
          }
      
          public void setProcessorInstance(Processor processorInstance) {
              this.processorInstance = processorInstance;
          }
      
          public String getProcessor() {
              return processor;
          }
      
          public void setProcessor(String processor) {
              this.processor = processor;
          }
      
          public String getLogType() {
              return logType;
          }
      
          public void setLogType(String logType) {
              this.logType = logType;
          }
      
          @Override
          public String toString() {
              return "ProcessorBean{" +
                      "processor='" + processor + '\'' +
                      ", logType='" + logType + '\'' +
                      ", processorInstance=" + processorInstance +
                      '}';
          }
      }
      ...               
      processorBeanList = GsonUtil.gson.fromJson(EtcdUtil.getLocalPropertie("processors"), new TypeToken<List<ProcessorBean>>() { }.getType()); processorBeanList.forEach(processorBean -> { try { Processor<?> processor = (Processor<?>) Class.forName(processorBean.getProcessor()).newInstance(); processorBean.setProcessorInstance(processor); } catch (Throwable e) { e.printStackTrace(); } });
      ...
  • 3、process 為動態裝載形式,可以隨時開啟和關閉。Process中業務自己處理自己的業務邏輯。
  • 4、source負責資料採集
  • 5、sink負責資料入庫到目標端,並且負責通知(可以在動態配置中配置是否開啟通知功能)
    • isNotice=1#1代表開啟通知
    • public interface Notice {
          void noticePostLog(String logType);
          void noticePostLog(List<Map<String,Object>> noticeMsg);
      }
       public void noticePostLog(String logType) {
              if (null != EtcdUtil.getLocalPropertie("isNotice") && "1".equals(EtcdUtil.getLocalPropertie("isNotice"))) {
                  List<Map<String, Object>> callList = new ArrayList<>();
                  ................
                  if (null != callList && callList.size() > 0) {
                      noticePostLog(callList);
                  }
              }
          }
               if (null != processorBeanList && processorBeanList.size() > 0) {
                          for (ProcessorBean processorBean : processorBeanList) {
                              try {
                                  if (logType.equals(processorBean.getLogType())) {
                                      if ("2".equals(logType)) {
                                          log = (BusinessLog) processorBean.getProcessorInstance().process(log);
                                      } else if ("5".equals(logType)) {
                                          log = (CrawlerLog) processorBean.getProcessorInstance().process(log);
                                      }
                                  }
                              } catch (Throwable e) {
                                  logger.error("exec process cause Exception", e);
                              }
                          }
                      }
  • 6、通知為一個通用的json欄位。
  • 7、後續所有的應用伺服器都在裝機時,統一預先把flume包放入進去。使用者在使用flume時,只需要做配置以及上傳自己的process包。
  • 8、除了process不能複用外,其他的部分都通用元件複用。
  • 9、process就類似流水線作業的一樣。

 本文作者:張永清 連線:https://www.cnblogs.com/laoqing/p/12620747.html

 三、Flume 日誌採集中的流水線架構設計在爬蟲中的架構實踐

這裡以稅務資料爬蟲為例,仔細看如下的架構設計

 

 

  • 1、稅務的爬蟲資料採用flume進行採集入庫
  • 2、由於各個省的稅務網站欠差萬別,資料在爬蟲下來後,需要按照不同的省份進行進行(html頁面資料解析,由於每個省的稅務網站不同,html不一樣)。解析時,就採用了process處理。
  • 3、每個省份有一套解析的程式碼,每個省份實現同一個底層的解析介面,解析時,通過http介面從業務系統中獲取配置的解析規則。
public interface TaxCrawlerAnalysis {

    TaxTable analysisTaxTable(TaxHtmlTable taxHtmlTable,String taxTableType);
}
  • 4、每個省份的解析類同樣採用動態載入的方式,在解析處理時通過省份編碼的形式進行匹配。
taxCrawlerAnalysises=[{"taxCrawlerAnalysis":"com.xxx.bigdata.crawler.tax.common.analysis.TaxCrawlerPdfTableAnalysis","provinceCity":"320000"}]

 四、總結

作者的原創文章,轉載須註明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對於轉載了博主的原創文章,不標註出處的,作者將依法追究版權,請尊重作者的成果。本文作者:張永清 連線:https://www.cnblogs.com/laoqing/p/12620747.html

1、流水線的處理,讓flume可以動態的擴充套件,可以支援自定義的業務處理。業務處理的程式碼可以作為單獨的專案即插即用的整合到flume中。

2、etcd作為動態配置中心後,配置可以做到動態的更新,而不需要配置變更後,對jvm程序進行重啟。

3、對flume進行改造和擴充套件的程式碼,後續都會放入個人github中。