Druid學習之路 (五)Druid的資料攝取任務型別
作者:Syn良子 出處:https://www.cnblogs.com/cssdongl/p/9885534.html 轉載請註明出處
Druid的資料攝取任務型別
Druid支援很多種型別的資料攝取任務.任務通過CURL POST的方式提交到Overlord節點然後分配給middle manager執行.
Segment建立任務型別
本地批處理索引任務
本地批處理攝取任務
{ "type" : "index", "spec" : { "dataSchema" : { "dataSource" : "wikipedia", "parser" : { "type" : "string", "parseSpec" : { "format" : "json", "timestampSpec" : { "column" : "timestamp", "format" : "auto" }, "dimensionsSpec" : { "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], "dimensionExclusions" : [], "spatialDimensions" : [] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "doubleSum", "name" : "added", "fieldName" : "added" }, { "type" : "doubleSum", "name" : "deleted", "fieldName" : "deleted" }, { "type" : "doubleSum", "name" : "delta", "fieldName" : "delta" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "DAY", "queryGranularity" : "NONE", "intervals" : [ "2013-08-31/2013-09-01" ] } }, "ioConfig" : { "type" : "index", "firehose" : { "type" : "local", "baseDir" : "examples/indexing/", "filter" : "wikipedia_data.json" } }, "tuningConfig" : { "type" : "index", "targetPartitionSize" : 5000000, "maxRowsInMemory" : 75000 } } }
以上為本地索引任務的語法格式,注意type必須為"index",這個任務將本地examples/indexing/下的wikipedia_data.json檔案攝取到druid的segment中去,可以通過CURL POST的方式提交到Overlord,並不需要額外的hadoop配置
Hadoop索引任務
{ "type" : "index_hadoop", "spec" : { "dataSchema" : { "dataSource" : "wikipedia", "parser" : { "type" : "hadoopyString", "parseSpec" : { "format" : "json", "timestampSpec" : { "column" : "timestamp", "format" : "auto" }, "dimensionsSpec" : { "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], "dimensionExclusions" : [], "spatialDimensions" : [] } } }, "metricsSpec" : [ { "type" : "count", "name" : "count" }, { "type" : "doubleSum", "name" : "added", "fieldName" : "added" }, { "type" : "doubleSum", "name" : "deleted", "fieldName" : "deleted" }, { "type" : "doubleSum", "name" : "delta", "fieldName" : "delta" } ], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "DAY", "queryGranularity" : "NONE", "intervals" : [ "2013-08-31/2013-09-01" ] } }, "ioConfig" : { "type" : "hadoop", "inputSpec" : { "type" : "static", "paths" : "/MyDirectory/example/wikipedia_data.json" } }, "tuningConfig" : { "type": "hadoop" } }, "hadoopDependencyCoordinates": <my_hadoop_version> }
以上為Hadoop索引任務的語法格式,注意type必須為"index_hadoop",這個任務將/MyDirectory/example/wikipedia_data.json檔案攝取到druid的segment中去,注意這個路徑是基於HDFS的,任務可以通過CURL POST的方式提交到Overlord,需要額外的hadoop已經配置好,因為最終會轉化為MapReduce的方式來攝取
Kafka索引任務
{ "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "timestamp", "format": "auto" }, "dimensionsSpec": { "dimensions": [], "dimensionExclusions": [ "timestamp", "value" ] } } }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "value_sum", "fieldName": "value", "type": "doubleSum" }, { "name": "value_min", "fieldName": "value", "type": "doubleMin" }, { "name": "value_max", "fieldName": "value", "type": "doubleMax" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE" } }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 5000000 }, "ioConfig": { "topic": "metrics", "consumerProperties": { "bootstrap.servers": "localhost:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" } }
以上為Kafka索引任務的語法格式,注意type必須為"kafka",這個任務通過localhost:9092埠來消費kafka中的資料並攝取到druid的segment中去,注意這個kafka攝取的任務型別還在實驗階段並且需要kafka0.10的支援
流式Streaming push任務型別
這種任務型別是通過Tranquility來自動化的建立realtime任務型別提交到overlord來執行.Tranquility是什麼?如下為其github地址
我們可以利用Tranquility消費實時資料並向Druid傳送實時事件流,並無縫地處理分割槽,複製,提供服務發現等功能.我舉個栗子,比如你可以通過storm或者sparkstreaming或者flink來整合Tranquility實時的消費kafka資料流並載入到druid的segments中去並且可以同時進行實時的資料查詢.這種方案要寫大量程式碼的但是相對來說比較成熟自由度較高.隨後我會找時間單獨詳細講解.
壓縮任務型別
{
"type": "compact",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>,
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}
注意任務型別必須為compact,這個任務型別可以壓縮指定時間段內的segments到一個新的segments並同時指定分割槽數和維度組合
參考資料:Druid的任務型別總覽