1. 程式人生 > >Druid 0.17 入門(3)—— 資料接入指南

Druid 0.17 入門(3)—— 資料接入指南

![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085651906-571819466.jpg) 在快速開始中,我們演示了接入本地示例資料方式,但Druid其實支援非常豐富的資料接入方式。比如批處理資料的接入和實時流資料的接入。本文我們將介紹這幾種資料接入方式。 - **檔案資料接入**:從檔案中載入批處理資料 - **從Kafka中接入流資料**:從Kafka中載入流資料 - **Hadoop資料接入**:從Hadoop中載入批處理資料 - **編寫自己的資料接入規範**:自定義新的接入規範 本文主要介紹前兩種最常用的資料接入方式。 ## 1、Loading a file——載入檔案 Druid提供以下幾種方式載入資料: - 通過頁面資料載入器 - 通過控制檯 - 通過命令列 - 通過Curl命令呼叫 ### 1.1、資料載入器 Druid提供了一個示例資料檔案,其中包含2015年9月12日發生的Wiki的示例資料。 此樣本資料位於`quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz` 示例資料大概是這樣: ``` { "timestamp":"2015-09-12T20:03:45.018Z", "channel":"#en.wikipedia", "namespace":"Main", "page":"Spider-Man's powers and equipment", "user":"foobar", "comment":"/* Artificial web-shooters */", "cityName":"New York", "regionName":"New York", "regionIsoCode":"NY", "countryName":"United States", "countryIsoCode":"US", "isAnonymous":false, "isNew":false, "isMinor":false, "isRobot":false, "isUnpatrolled":false, "added":99, "delta":99, "deleted":0, } ``` Druid載入資料分為以下幾種: - 載入檔案 - 從kafka中載入資料 - 從hadoop中載入資料 - 自定義載入方式 我們這樣演示一下載入示例檔案資料 ##### 1.1.1、進入localhost:8888 點選load data ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085652292-1477787964.jpg) ##### 1.1.2、選擇local disk ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085652704-1762187705.jpg) ##### 1.1.3、選擇Connect data ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085653061-634765944.jpg) ##### 1.1.4、預覽資料 Base directory輸入quickstart/tutorial/ File filter輸入 wikiticker-2015-09-12-sampled.json.gz 然後點選apply預覽 就可以看見資料了 點選Next:parse data解析資料 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085653652-590886946.jpg) ##### 1.1.5、解析資料 可以看到json資料已經被解析了 繼續解析時間 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654068-1185738456.jpg) ##### 1.1.6、解析時間 解析時間成功 之後兩步是transform和filter 這裡不做演示了 直接next ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654460-1302068690.jpg) ##### 1.1.7、確認Schema 這一步會讓我們確認Schema 可以做一些修改 由於資料量較小 我們直接關掉Rollup 直接下一步 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654856-423400439.jpg) ##### 1.1.8、設定分段 這裡可以設定資料分段 我們選擇hour next ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655207-1910689495.jpg) ##### 1.1.9、確認釋出 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655533-1019324676.jpg) ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655856-446442045.jpg) ##### 1.1.10、釋出成功 開始解析資料 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656178-822464076.jpg) 等待任務成功 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656509-1753060725.jpg) ##### 1.1.11、檢視資料 選擇datasources 可以看到我們載入的資料 可以看到資料來源名稱 Fully是完全可用 還有大小等各種資訊 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656848-1762524416.jpg) ##### 1.1.12、查詢資料 點選query按鈕 我們可以寫sql查詢資料了 還可以將資料下載 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085657184-897146130.jpg) ### 1.2 控制檯 在任務檢視中,單擊Submit JSON task ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085657825-842617736.jpg) 這將開啟規格提交對話方塊,貼上規範 ``` { "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "wikipedia", "dimensionsSpec" : { "dimensions" : [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "timestampSpec": { "column": "time", "format": "iso" }, "metricsSpec" : [], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "day", "queryGranularity" : "none", "intervals" : ["2015-09-12/2015-09-13"], "rollup" : false } }, "ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/tutorial/", "filter" : "wikiticker-2015-09-12-sampled.json.gz" }, "inputFormat" : { "type": "json" }, "appendToExisting" : false }, "tuningConfig" : { "type" : "index_parallel", "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000 } } } ``` ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085658126-438187606.jpg) 檢視載入任務即可。 ### 1.3 命令列 為了方便起見,Druid提供了一個載入資料的指令碼 ``` bin/post-index-task ``` 我們可以執行命令 ``` bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081 ``` 看到如下輸出: ``` Beginning indexing data for wikipedia Task started: index_wikipedia_2018-07-27T06:37:44.323Z Task log: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/log Task status: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/status Task index_wikipedia_2018-07-27T06:37:44.323Z still running... Task index_wikipedia_2018-07-27T06:37:44.323Z still running... Task finished with status: SUCCESS Completed indexing data for wikipedia. Now loading indexed data onto the cluster... wikipedia loading complete! You may now query your data ``` 檢視載入任務即可。 ### 1.4 CURL 我們可以通過直接呼叫CURL來載入資料 ``` curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081/druid/indexer/v1/task ``` 提交成功 ``` {"task":"index_wikipedia_2018-06-09T21:30:32.802Z"} ``` ## 2、Load from Apache Kafka——從Apache Kafka載入流資料 Apache Kafka是一個高效能的訊息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源訊息系統專案。 Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該專案的目標是為處理實時資料提供一個統一、高通量、低等待(低延時)的平臺。 更多kafka相關請檢視[Kafka入門寶典(詳細截圖版)](https://mp.weixin.qq.com/s/oFEv5c5zO7NAMA3YYB3CrQ) ### 2.1 安裝kafka 我們安裝一個最新的kafka ``` curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz tar -xzf kafka_2.12-2.1.0.tgz cd kafka_2.12-2.1.0 ``` 啟動kafka ``` ./bin/kafka-server-start.sh config/server.properties ``` 建立一個topic ``` ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia ``` ### 2.2 將資料寫入Kafka 向kafka的topic為wikipedia寫入資料 ``` cd quickstart/tutorial gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json ``` 在kafka目錄中執行命令 {PATH_TO_DRUID}替換為druid目錄 ``` export KAFKA_OPTS="-Dfile.encoding=UTF-8" ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json ``` ### 2.3 載入kafka資料到Druid druid載入kafka的資料也有多種方式 - 資料載入器 - 控制檯 - CURL #### 2.3.1 資料載入器 ##### 2.3.1.1 進入localhost:8888 點選load data 選擇`Apache Kafka`並單擊`Connect data` ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085658508-13515230.jpg) ##### 2.3.1.2 輸入kafka伺服器`localhost:9092` ##### 輸入topic wikipedia 可以預覽資料 然後下一步 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659086-1877246991.jpg) ##### 2.3.1.3 解析資料 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659516-1969246996.jpg) 2.3.1.4 解析時間戳 設定轉換 設定過濾 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659953-1741556967.jpg) ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085700451-371940198.jpg) ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085700822-141714061.jpg) ##### 2.3.1.4 這步比較重要 確定統計的範圍 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085701138-1897062580.jpg) ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085701530-1741799766.jpg) ##### 2.3.1.5 釋出 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702014-626016517.jpg) ##### 2.3.1.6 等待任務完成 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702366-1043860547.jpg) ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702760-1551772401.jpg) ##### 2.3.1.7 去查詢頁面檢視 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085703163-391760406.jpg) #### 2.3.2 控制檯 在任務檢視中,單擊`Submit JSON supervisor`以開啟對話方塊。 ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085703477-940123858.jpg) 貼上進去如下指令 ``` { "type": "kafka", "spec" : { "dataSchema": { "dataSource": "wikipedia", "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "metricsSpec" : [], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": "NONE", "rollup": false } }, "tuningConfig": { "type": "kafka", "reportParseExceptions": false }, "ioConfig": { "topic": "wikipedia", "inputFormat": { "type": "json" }, "replicas": 2, "taskDuration": "PT10M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "localhost:9092" } } } } ``` #### 2.3.3 CURL 我們也可以通過直接呼叫CURL來載入kafka資料 ``` curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor ``` **靜下心來,努力的提升自己,永遠都沒有錯。更多實時計算相關博文,歡迎關注實時流式計算** ![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085704153-227283