1. 程式人生 > >Druid學習之路 (五)Druid的資料攝取任務型別

Druid學習之路 (五)Druid的資料攝取任務型別

作者:Syn良子 出處:https://www.cnblogs.com/cssdongl/p/9885534.html 轉載請註明出處

Druid的資料攝取任務型別


Druid支援很多種型別的資料攝取任務.任務通過CURL POST的方式提交到Overlord節點然後分配給middle manager執行.

Segment建立任務型別


本地批處理索引任務

本地批處理攝取任務

{
"type" : "index",
"spec" : {
"dataSchema" : {
  "dataSource" : "wikipedia",
  "parser" : {
    "type" : "string",
    "parseSpec" : {
      "format" : "json",
      "timestampSpec" : {
        "column" : "timestamp",
        "format" : "auto"
      },
      "dimensionsSpec" : {
        "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
        "dimensionExclusions" : [],
        "spatialDimensions" : []
      }
    }
  },
  "metricsSpec" : [
    {
      "type" : "count",
      "name" : "count"
    },
    {
      "type" : "doubleSum",
      "name" : "added",
      "fieldName" : "added"
    },
    {
      "type" : "doubleSum",
      "name" : "deleted",
      "fieldName" : "deleted"
    },
    {
      "type" : "doubleSum",
      "name" : "delta",
      "fieldName" : "delta"
    }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "DAY",
    "queryGranularity" : "NONE",
    "intervals" : [ "2013-08-31/2013-09-01" ]
  }
},
"ioConfig" : {
  "type" : "index",
  "firehose" : {
    "type" : "local",
    "baseDir" : "examples/indexing/",
    "filter" : "wikipedia_data.json"
   }
},
"tuningConfig" : {
  "type" : "index",
  "targetPartitionSize" : 5000000,
  "maxRowsInMemory" : 75000
}
}
}

以上為本地索引任務的語法格式,注意type必須為"index",這個任務將本地examples/indexing/下的wikipedia_data.json檔案攝取到druid的segment中去,可以通過CURL POST的方式提交到Overlord,並不需要額外的hadoop配置


Hadoop索引任務

{
"type" : "index_hadoop",
"spec" : {
"dataSchema" : {
  "dataSource" : "wikipedia",
  "parser" : {
    "type" : "hadoopyString",
    "parseSpec" : {
      "format" : "json",
      "timestampSpec" : {
        "column" : "timestamp",
        "format" : "auto"
      },
      "dimensionsSpec" : {
        "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
        "dimensionExclusions" : [],
        "spatialDimensions" : []
      }
    }
  },
  "metricsSpec" : [
    {
      "type" : "count",
      "name" : "count"
    },
    {
      "type" : "doubleSum",
      "name" : "added",
      "fieldName" : "added"
    },
    {
      "type" : "doubleSum",
      "name" : "deleted",
      "fieldName" : "deleted"
    },
    {
      "type" : "doubleSum",
      "name" : "delta",
      "fieldName" : "delta"
    }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "DAY",
    "queryGranularity" : "NONE",
    "intervals" : [ "2013-08-31/2013-09-01" ]
  }
},
"ioConfig" : {
  "type" : "hadoop",
  "inputSpec" : {
    "type" : "static",
    "paths" : "/MyDirectory/example/wikipedia_data.json"
  }
},
"tuningConfig" : {
  "type": "hadoop"
}
},
"hadoopDependencyCoordinates": <my_hadoop_version>
}

以上為Hadoop索引任務的語法格式,注意type必須為"index_hadoop",這個任務將/MyDirectory/example/wikipedia_data.json檔案攝取到druid的segment中去,注意這個路徑是基於HDFS的,任務可以通過CURL POST的方式提交到Overlord,需要額外的hadoop已經配置好,因為最終會轉化為MapReduce的方式來攝取


Kafka索引任務

{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"parser": {
  "type": "string",
  "parseSpec": {
    "format": "json",
    "timestampSpec": {
      "column": "timestamp",
      "format": "auto"
    },
    "dimensionsSpec": {
      "dimensions": [],
      "dimensionExclusions": [
        "timestamp",
        "value"
      ]
    }
  }
},
"metricsSpec": [
  {
    "name": "count",
    "type": "count"
  },
  {
    "name": "value_sum",
    "fieldName": "value",
    "type": "doubleSum"
  },
  {
    "name": "value_min",
    "fieldName": "value",
    "type": "doubleMin"
  },
  {
    "name": "value_max",
    "fieldName": "value",
    "type": "doubleMax"
  }
],
"granularitySpec": {
  "type": "uniform",
  "segmentGranularity": "HOUR",
  "queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics",
"consumerProperties": {
  "bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}

以上為Kafka索引任務的語法格式,注意type必須為"kafka",這個任務通過localhost:9092埠來消費kafka中的資料並攝取到druid的segment中去,注意這個kafka攝取的任務型別還在實驗階段並且需要kafka0.10的支援


流式Streaming push任務型別

這種任務型別是通過Tranquility來自動化的建立realtime任務型別提交到overlord來執行.Tranquility是什麼?如下為其github地址

https://github.com/druid-io/tranquility

我們可以利用Tranquility消費實時資料並向Druid傳送實時事件流,並無縫地處理分割槽,複製,提供服務發現等功能.我舉個栗子,比如你可以通過storm或者sparkstreaming或者flink來整合Tranquility實時的消費kafka資料流並載入到druid的segments中去並且可以同時進行實時的資料查詢.這種方案要寫大量程式碼的但是相對來說比較成熟自由度較高.隨後我會找時間單獨詳細講解.


壓縮任務型別

{
"type": "compact",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>,
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}

注意任務型別必須為compact,這個任務型別可以壓縮指定時間段內的segments到一個新的segments並同時指定分割槽數和維度組合

參考資料:Druid的任務型別總覽