如何快速地將Hive中的資料匯入ClickHouse
如何快速地將Hive中的資料匯入ClickHouse
ClickHouse是面向OLAP的分散式列式DBMS。我們部門目前已經把所有資料分析相關的日誌資料儲存至ClickHouse這個優秀的資料倉庫之中,當前日資料量達到了300億。
在之前的文章如何快速地把HDFS中的資料匯入ClickHouse中我們提到過使用Waterdrop——https://github.com/InterestingLab/waterdrop對HDFS中的資料經過很簡單的操作就可以將資料寫入ClickHouse。HDFS中的資料一般是非結構化的資料,那麼針對儲存在Hive中的結構化資料,我們應該怎麼操作呢?
Hive to ClickHouse
假定我們的資料已經儲存在Hive中,我們需要讀取Hive表中的資料並篩選出我們關心的欄位,或者對欄位進行轉換,最後將對應的欄位寫入ClickHouse的表中。
Hive Schema
我們在Hive中儲存的資料表結構如下,儲存的是很常見的Nginx日誌
CREATE TABLE `nginx_msg_detail`( `hostname` string, `domain` string, `remote_addr` string, `request_time` float, `datetime` string, `url` string, `status` int, `data_size` int, `referer` string, `cookie_info` string, `user_agent` string, `minute` string) PARTITIONED BY ( `date` string, `hour` string)
ClickHouse Schema
我們的ClickHouse建表語句如下,我們的表按日進行分割槽
CREATE TABLE cms.cms_msg ( date Date, datetime DateTime, url String, request_time Float32, status String, hostname String, domain String, remote_addr String, data_size Int32, ) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostnmae) SETTINGS index_granularity = 16384
Waterdrop with ClickHouse
接下來會給大家介紹,我們如何通過Waterdrop將Hive中的資料寫入ClickHouse中。
Waterdrop
Waterdrop是一個非常易用,高效能,能夠應對海量資料的實時資料處理產品,它構建在Spark之上。Waterdrop擁有著非常豐富的外掛,支援從Kafka、HDFS、Kudu中讀取資料,進行各種各樣的資料處理,並將結果寫入ClickHouse、Elasticsearch或者Kafka中。
Waterdrop的環境準備以及安裝步驟這裡就不一一贅述了,具體安裝步驟可以參考上一篇文章或者訪問Waterdrop Docs
Waterdrop Pipeline
我們僅需要編寫一個Waterdrop Pipeline的配置檔案即可完成資料的匯入。
配置檔案包括四個部分,分別是Spark、Input、filter和Output。
Spark
這一部分是Spark的相關配置,主要配置Spark執行時所需的資源大小。
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
Input
這一部分定義資料來源,如下是從Hive檔案中讀取text格式資料的配置案例。
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}
看,很簡單的一個配置就可以從Hive中讀取資料了。其中pre_sql
是從Hive中讀取資料SQL,table_name
是將讀取後的資料,註冊成為Spark中臨時表的表名,可為任意欄位。
需要注意的是,必須保證hive的metastore是在服務狀態。
在Cluster、Client、Local模式下執行時,必須把hive-site.xml
檔案置於提交任務節點的$HADOOP_CONF目錄下
Filter
在Filter部分,這裡我們配置一系列的轉化,我們這裡把不需要的minute和hour欄位丟棄。當然我們也可以在讀取Hive的時候通過pre_sql
不讀取這些欄位
filter {
remove {
source_field = ["minute", "hour"]
}
}
Output
最後我們將處理好的結構化資料寫入ClickHouse
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "waterdrop"
table = "nginx_log"
fields = ["date", "datetime", "hostname", "url", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}
Running Waterdrop
我們將上述四部分配置組合成為我們的配置檔案config/batch.conf
。
vim config/batch.conf
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}
filter {
remove {
source_field = ["minute", "hour"]
}
}
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "waterdrop"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}
執行命令,指定配置檔案,執行Waterdrop,即可將資料寫入ClickHouse。這裡我們以本地模式為例。
./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'
Conclusion
在這篇文章中,我們介紹瞭如何使用Waterdrop將Hive中的資料匯入ClickHouse中。僅僅通過一個配置檔案便可快速完成資料的匯入,無需編寫任何程式碼,十分簡單。
希望瞭解Waterdrop與ClickHouse、Elasticsearch、Kafka、Hadoop結合使用的更多功能和案例,可以直接進入專案主頁https://github.com/InterestingLab/waterdrop
– Power by InterestingLab