1. 程式人生 > >Flink流計算與時序資料庫Influxdb+grafana

Flink流計算與時序資料庫Influxdb+grafana

1、簡介

關於Influxdb和grafana,可以參考:介紹或者influxdb官方文件grafana官方文件。這裡預設已經將influxdb和grafana安裝完成。

influxdb不屬於Flink內建的第三方connector,因此需要自定義addSink()方法:

txStream.addSink(new InfluxDBSink("transaction"))

而InfluxDBSink需要實現SinkFunction或RichSinkFunction,例如:

class InfluxDBSink(measurement : String)
extends RichSinkFunction[TX]{
private val dataBaseName = "influxDemo" var influxDB : InfluxDB = null override def open(parameters : Configuration) : Unit = { super.open(parameters) influxDB = InfluxDBFactory.connect("http://data1:8086", "admin", "admin") influxDB.createDatabase(dataBaseName) influxDB.enableBatch(2000
, 100, TimeUnit.MILLISECONDS) } override def close() : Unit = { super.close() } override def invoke(in: TX): Unit = { val builder = Point.measurement(measurement) .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .tag("code", in.code) .addField("nIndex", in.nindex) .addField("price"
, in.price) .addField("volume", in.volume) .addField("value", in.value) val p = builder.build() influxDB.write(dataBaseName,"autogen",p) }

說明:
influxdb中,measurement相當於一個“表”;
一個記錄被當做一個point,time()方法表示一個point的時間,這也是時間序列資料庫必須的欄位,可以用系統時間作為插入的時間,也可以用event time作為point的時間;
field是必須的欄位,即一個列;
而tag是可選的,是要被索引的欄位,當需要對某個欄位進行索引時,需要設定為一個tag;
“autogen”代表預設的保留策略。
寫入時,可以一條寫入,也可以批量寫入。此例中代表逐條寫入。

結果可以通過influxdb提供的webUI以及grafana實時展示:
這裡寫圖片描述

這裡寫圖片描述

詳細的程式碼見:這裡

參考