1. 程式人生 > >如何使用Spark快速將資料寫入Elasticsearch

如何使用Spark快速將資料寫入Elasticsearch

如何使用Spark快速將資料寫入Elasticsearch

說到資料寫入Elasticsearch,最先想到的肯定是Logstash。Logstash因為其簡單上手、可擴充套件、可伸縮等優點被廣大使用者接受。但是尺有所短,寸有所長,Logstash肯定也有它無法適用的應用場景,比如:

  • 海量資料ETL
  • 海量資料聚合
  • 多源資料處理

為了滿足這些場景,很多同學都會選擇Spark,藉助Spark運算元進行資料處理,最後將處理結果寫入Elasticsearch。

我們部門之前利用Spark對Nginx日誌進行分析,統計我們的Web服務訪問情況,將Nginx日誌每分鐘聚合一次最後將結果寫入Elasticsearch,然後利用Kibana配置實時監控Dashboard。Elasticsearch和Kibana都很方便、實用,但是隨著類似需求越來越多,如何快速通過Spark將資料寫入Elasticsearch成為了我們的一大問題。

今天給大家推薦一款能夠實現資料快速寫入的黑科技——Waterdrop,一個非常易用,高效能,能夠應對海量資料的實時資料處理產品,它構建在Spark之上,簡單易用,靈活配置,無需開發。

Kafka to Elasticsearch

和Logstash一樣,Waterdrop同樣支援多種型別的資料輸入,這裡我們以最常見的Kakfa作為輸入源為例,講解如何使用Waterdrop將資料快速寫入Elasticsearch

Log Sample

原始日誌格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Document

我們想要統計,一分鐘每個域名的訪問情況,聚合完的資料有以下欄位:

domain String
hostname String
status int
datetime String
count int

Waterdrop with Elasticsearch

接下來會給大家詳細介紹,我們如何通過Waterdrop讀取Kafka中的資料,對資料進行解析以及聚合,最後將處理結果寫入Elasticsearch中。

Waterdrop

Waterdrop同樣擁有著非常豐富的外掛,支援從Kafka、HDFS、Hive中讀取資料,進行各種各樣的資料處理,並將結果寫入Elasticsearch、Kudu或者Kafka中。

Prerequisites

首先我們需要安裝Waterdrop,安裝十分簡單,無需配置系統環境變數

  1. 準備Spark環境
  2. 安裝Waterdrop
  3. 配置Waterdrop

以下是簡易步驟,具體安裝可以參照Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1

vim config/waterdrop-env.sh
# 指定Spark安裝路徑
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Waterdrop Pipeline

與Logstash一樣,我們僅需要編寫一個Waterdrop Pipeline的配置檔案即可完成資料的匯入,相信瞭解Logstash的朋友可以很快入手Waterdrop配置。

配置檔案包括四個部分,分別是Spark、Input、filter和Output。

Spark

這一部分是Spark的相關配置,主要配置Spark執行時所需的資源大小。

spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

Input

這一部分定義資料來源,如下是從Kafka中讀取資料的配置案例,

kafkaStream {
    topics = "waterdrop-es"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.group.id = "waterdrop_es_group"
    consumer.rebalance.max.retries = 100
}

Filter

在Filter部分,這裡我們配置一系列的轉化,包括正則解析將日誌進行拆分、時間轉換將HTTPDATE轉化為Elasticsearch支援的日期格式、對Number型別的欄位進行型別轉換以及通過SQL進行資料聚合

filter {
    # 使用正則解析原始日誌
    # 最開始資料都在raw_message欄位中
    grok {
        source_field = "raw_message"
        pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 將"dd/MMM/yyyy:HH:mm:ss Z"格式的資料轉換為
    # Elasticsearch中支援的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
    }
    ## 利用SQL對資料進行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
    }
 }

Output

最後我們將處理好的結構化資料寫入Elasticsearch。

output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

Running Waterdrop

我們將上述四部分配置組合成為我們的配置檔案config/batch.conf

vim config/batch.conf
spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    kafkaStream {
        topics = "waterdrop-es"
        consumer.bootstrap.servers = "localhost:9092"
        consumer.group.id = "waterdrop_es_group"
        consumer.rebalance.max.retries = 100
    }
}
filter {
    # 使用正則解析原始日誌
    # 最開始資料都在raw_message欄位中
    grok {
        source_field = "raw_message"
        pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 將"dd/MMM/yyyy:HH:mm:ss Z"格式的資料轉換為
    # Elasticsearch中支援的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
    }
    ## 利用SQL對資料進行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
    }
 }
output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "waterdrop-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}

執行命令,指定配置檔案,執行Waterdrop,即可將資料寫入Elasticsearch。這裡我們以本地模式為例。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

最後,寫入Elasticsearch中的資料如下,再配上Kibana就可以實現Web服務的實時監控了_.

"_source": {
    "domain": "elasticsearch.cn",
    "hostname": "localhost",
    "status": "200",
    "datetime": "2018-11-26T21:54:00.000+08:00",
    "count": 26
  }

Conclusion

在這篇文章中,我們介紹瞭如何通過Waterdrop將Kafka中的資料寫入Elasticsearch中。僅僅通過一個配置檔案便可快速執行一個Spark Application,完成資料的處理、寫入,無需編寫任何程式碼,十分簡單。

當資料處理過程中有遇到Logstash無法支援的場景或者Logstah效能無法達到預期的情況下,都可以嘗試使用Waterdrop解決問題。

希望瞭解Waterdrop與Elasticsearch、Kafka、Hadoop結合使用的更多功能和案例,可以直接進入專案主頁https://github.com/InterestingLab/waterdrop

–Power by InterestingLab