1. 程式人生 > >如何快速地將Hive中的資料匯入ClickHouse

如何快速地將Hive中的資料匯入ClickHouse

如何快速地將Hive中的資料匯入ClickHouse

ClickHouse是面向OLAP的分散式列式DBMS。我們部門目前已經把所有資料分析相關的日誌資料儲存至ClickHouse這個優秀的資料倉庫之中,當前日資料量達到了300億。

在之前的文章如何快速地把HDFS中的資料匯入ClickHouse中我們提到過使用Waterdrop——https://github.com/InterestingLab/waterdrop對HDFS中的資料經過很簡單的操作就可以將資料寫入ClickHouse。HDFS中的資料一般是非結構化的資料,那麼針對儲存在Hive中的結構化資料,我們應該怎麼操作呢?

Hive to ClickHouse

假定我們的資料已經儲存在Hive中,我們需要讀取Hive表中的資料並篩選出我們關心的欄位,或者對欄位進行轉換,最後將對應的欄位寫入ClickHouse的表中。

Hive Schema

我們在Hive中儲存的資料表結構如下,儲存的是很常見的Nginx日誌

CREATE TABLE `nginx_msg_detail`(
   `hostname` string,
   `domain` string,
   `remote_addr` string,
   `request_time` float,
   `datetime` string,
   `url` string,
   `status` int,
   `data_size` int,
   `referer` string,
   `cookie_info` string,
   `user_agent` string,
   `minute` string)
 PARTITIONED BY (
   `date` string,
   `hour` string)

ClickHouse Schema

我們的ClickHouse建表語句如下,我們的表按日進行分割槽

CREATE TABLE cms.cms_msg
(
    date Date, 
    datetime DateTime, 
    url String, 
    request_time Float32, 
    status String, 
    hostname String, 
    domain String, 
    remote_addr String, 
    data_size Int32, 
) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostnmae) SETTINGS index_granularity = 16384

Waterdrop with ClickHouse

接下來會給大家介紹,我們如何通過Waterdrop將Hive中的資料寫入ClickHouse中。

Waterdrop

Waterdrop是一個非常易用,高效能,能夠應對海量資料的實時資料處理產品,它構建在Spark之上。Waterdrop擁有著非常豐富的外掛,支援從Kafka、HDFS、Kudu中讀取資料,進行各種各樣的資料處理,並將結果寫入ClickHouse、Elasticsearch或者Kafka中。

Waterdrop的環境準備以及安裝步驟這裡就不一一贅述了,具體安裝步驟可以參考上一篇文章或者訪問Waterdrop Docs

Waterdrop Pipeline

我們僅需要編寫一個Waterdrop Pipeline的配置檔案即可完成資料的匯入。

配置檔案包括四個部分,分別是Spark、Input、filter和Output。

Spark

這一部分是Spark的相關配置,主要配置Spark執行時所需的資源大小。

spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

Input

這一部分定義資料來源,如下是從Hive檔案中讀取text格式資料的配置案例。

input {
    hive {
        pre_sql = "select * from access.nginx_msg_detail"
        table_name = "access_log"
    }
}

看,很簡單的一個配置就可以從Hive中讀取資料了。其中pre_sql是從Hive中讀取資料SQL,table_name是將讀取後的資料,註冊成為Spark中臨時表的表名,可為任意欄位。

需要注意的是,必須保證hive的metastore是在服務狀態。

在Cluster、Client、Local模式下執行時,必須把hive-site.xml檔案置於提交任務節點的$HADOOP_CONF目錄下

Filter

在Filter部分,這裡我們配置一系列的轉化,我們這裡把不需要的minute和hour欄位丟棄。當然我們也可以在讀取Hive的時候通過pre_sql不讀取這些欄位

filter {
    remove {
        source_field = ["minute", "hour"]
    }
}

Output

最後我們將處理好的結構化資料寫入ClickHouse

output {
    clickhouse {
        host = "your.clickhouse.host:8123"
        database = "waterdrop"
        table = "nginx_log"
        fields = ["date", "datetime", "hostname", "url", "http_code", "request_time", "data_size", "domain"]
        username = "username"
        password = "password"
    }
}

Running Waterdrop

我們將上述四部分配置組合成為我們的配置檔案config/batch.conf

vim config/batch.conf
spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    hive {
        pre_sql = "select * from access.nginx_msg_detail"
        table_name = "access_log"
    }
}
filter {
    remove {
        source_field = ["minute", "hour"]
    }
}
output {
    clickhouse {
        host = "your.clickhouse.host:8123"
        database = "waterdrop"
        table = "access_log"
        fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
        username = "username"
        password = "password"
    }
}

執行命令,指定配置檔案,執行Waterdrop,即可將資料寫入ClickHouse。這裡我們以本地模式為例。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

Conclusion

在這篇文章中,我們介紹瞭如何使用Waterdrop將Hive中的資料匯入ClickHouse中。僅僅通過一個配置檔案便可快速完成資料的匯入,無需編寫任何程式碼,十分簡單。

希望瞭解Waterdrop與ClickHouse、Elasticsearch、Kafka、Hadoop結合使用的更多功能和案例,可以直接進入專案主頁https://github.com/InterestingLab/waterdrop

– Power by InterestingLab