如何使用Spark快速將資料寫入Elasticsearch
如何使用Spark快速將資料寫入Elasticsearch
說到資料寫入Elasticsearch,最先想到的肯定是Logstash。Logstash因為其簡單上手、可擴充套件、可伸縮等優點被廣大使用者接受。但是尺有所短,寸有所長,Logstash肯定也有它無法適用的應用場景,比如:
- 海量資料ETL
- 海量資料聚合
- 多源資料處理
為了滿足這些場景,很多同學都會選擇Spark,藉助Spark運算元進行資料處理,最後將處理結果寫入Elasticsearch。
我們部門之前利用Spark對Nginx日誌進行分析,統計我們的Web服務訪問情況,將Nginx日誌每分鐘聚合一次最後將結果寫入Elasticsearch,然後利用Kibana配置實時監控Dashboard。Elasticsearch和Kibana都很方便、實用,但是隨著類似需求越來越多,如何快速通過Spark將資料寫入Elasticsearch成為了我們的一大問題。
今天給大家推薦一款能夠實現資料快速寫入的黑科技——Waterdrop,一個非常易用,高效能,能夠應對海量資料的實時資料處理產品,它構建在Spark之上,簡單易用,靈活配置,無需開發。
Kafka to Elasticsearch
和Logstash一樣,Waterdrop同樣支援多種型別的資料輸入,這裡我們以最常見的Kakfa作為輸入源為例,講解如何使用Waterdrop將資料快速寫入Elasticsearch
Log Sample
原始日誌格式如下:
127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"
Elasticsearch Document
我們想要統計,一分鐘每個域名的訪問情況,聚合完的資料有以下欄位:
domain String
hostname String
status int
datetime String
count int
Waterdrop with Elasticsearch
接下來會給大家詳細介紹,我們如何通過Waterdrop讀取Kafka中的資料,對資料進行解析以及聚合,最後將處理結果寫入Elasticsearch中。
Waterdrop
Waterdrop同樣擁有著非常豐富的外掛,支援從Kafka、HDFS、Hive中讀取資料,進行各種各樣的資料處理,並將結果寫入Elasticsearch、Kudu或者Kafka中。
Prerequisites
首先我們需要安裝Waterdrop,安裝十分簡單,無需配置系統環境變數
- 準備Spark環境
- 安裝Waterdrop
- 配置Waterdrop
以下是簡易步驟,具體安裝可以參照Quick Start
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1
vim config/waterdrop-env.sh
# 指定Spark安裝路徑
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
Waterdrop Pipeline
與Logstash一樣,我們僅需要編寫一個Waterdrop Pipeline的配置檔案即可完成資料的匯入,相信瞭解Logstash的朋友可以很快入手Waterdrop配置。
配置檔案包括四個部分,分別是Spark、Input、filter和Output。
Spark
這一部分是Spark的相關配置,主要配置Spark執行時所需的資源大小。
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
Input
這一部分定義資料來源,如下是從Kafka中讀取資料的配置案例,
kafkaStream {
topics = "waterdrop-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "waterdrop_es_group"
consumer.rebalance.max.retries = 100
}
Filter
在Filter部分,這裡我們配置一系列的轉化,包括正則解析將日誌進行拆分、時間轉換將HTTPDATE轉化為Elasticsearch支援的日期格式、對Number型別的欄位進行型別轉換以及通過SQL進行資料聚合
filter {
# 使用正則解析原始日誌
# 最開始資料都在raw_message欄位中
grok {
source_field = "raw_message"
pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# 將"dd/MMM/yyyy:HH:mm:ss Z"格式的資料轉換為
# Elasticsearch中支援的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
}
## 利用SQL對資料進行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}
Output
最後我們將處理好的結構化資料寫入Elasticsearch。
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}
Running Waterdrop
我們將上述四部分配置組合成為我們的配置檔案config/batch.conf
。
vim config/batch.conf
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
kafkaStream {
topics = "waterdrop-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "waterdrop_es_group"
consumer.rebalance.max.retries = 100
}
}
filter {
# 使用正則解析原始日誌
# 最開始資料都在raw_message欄位中
grok {
source_field = "raw_message"
pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# 將"dd/MMM/yyyy:HH:mm:ss Z"格式的資料轉換為
# Elasticsearch中支援的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
}
## 利用SQL對資料進行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
}
}
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}
執行命令,指定配置檔案,執行Waterdrop,即可將資料寫入Elasticsearch。這裡我們以本地模式為例。
./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'
最後,寫入Elasticsearch中的資料如下,再配上Kibana就可以實現Web服務的實時監控了_.
"_source": {
"domain": "elasticsearch.cn",
"hostname": "localhost",
"status": "200",
"datetime": "2018-11-26T21:54:00.000+08:00",
"count": 26
}
Conclusion
在這篇文章中,我們介紹瞭如何通過Waterdrop將Kafka中的資料寫入Elasticsearch中。僅僅通過一個配置檔案便可快速執行一個Spark Application,完成資料的處理、寫入,無需編寫任何程式碼,十分簡單。
當資料處理過程中有遇到Logstash無法支援的場景或者Logstah效能無法達到預期的情況下,都可以嘗試使用Waterdrop解決問題。
希望瞭解Waterdrop與Elasticsearch、Kafka、Hadoop結合使用的更多功能和案例,可以直接進入專案主頁https://github.com/InterestingLab/waterdrop
–Power by InterestingLab