1. 程式人生 > >InfluxDB:如何進行連線,數學測量

InfluxDB:如何進行連線,數學測量

如果您是InfluxData社群的成員,那麼您可能希望在某些時候跨測量執行數學運算。你做了一些谷歌搜尋並偶然發現了這個GitHub問題3552並且流下了一滴小小的淚水。好吧,今天我成了好訊息的承載者。InfluxData釋出了Flux技術預覽Flux是一種新的查詢語言和時間序列資料引擎,它具有跨測量執行數學運算的能力。

在本文中,我將分享兩個如何跨測量執行數學運算的示例:

  1. 如何計算每個請求寫入資料庫的行的批量大小。以下示例是探索測量數學的最快方法。您可以簡單地啟動並執行沙箱,然後複製並貼上程式碼以自行嘗試。
  2. 如何“監控”熱交換器的效率隨著時間的推移。您可以找到此部件的資料集和Flux查詢位於此
    倉庫中

要了解Flux的所有功能,請檢視規範和配套文件

如何計算每個請求寫入資料庫的行的批量大小

克隆沙並執行後./sandbox up,您將使整個TICK堆疊以容器化方式執行。“telegraf”資料庫包含從本地計算機收集的多個指標。要計算要收集和寫入InfluxDB的度量的批量大小,我們需要找到隨時間寫入資料庫的行數,並將該值除以同一時間段內的寫入請求數。

首先,過濾資料以隔離所寫的寫請求數和寫入的行數。將該資料分別儲存在兩個表中,“httpd”和“write”。

httpd = from(bucket:"telegraf/autogen") |> range(start: dashboardTime) |> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")

write = from(bucket:"telegraf/autogen") |> range(start: dashboardTime) |> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")

接下來,加入兩個表。在加入預設為左連線。最後,我們使用Map函式劃分這兩個值,並計算儀表板時間(-5m)的平均批量大小。

avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"]) |> map(fn:(r) => ({ _value: r._value_write / r._value_httpd})) |> mean()

我將視覺化型別更改為“Table”,因為我的Flux指令碼只返回一個值。我們可以看到過去5分鐘的平均批量大小是〜62行/寫。

旁註:雖然這個查詢很簡單,但效率很低。它僅用於演示目的。如果要檢視較長時間範圍內的平均批量大小,可能需要1)視窗httpd表和寫表,2)分別計算平均值和最大值。做這樣的事情將允許您在跨測量執行數學之前聚合資料,這將更快,更有效。

如何“監測”熱交換器的效率隨著時間的推移

在這個例子中,我決定想象我是化工廠的操作員,我需要監控逆流熱交換器的溫度。我從四個不同的溫度感測器收集冷(TC)和熱(TH)流的溫度。有兩個入口(Tc2Th1)感測器和兩個出口(Tc1Th2)感測器在位置x1x2分別。

做了一些假設後,我可以用這個公式計算傳熱效率:

我在兩個不同的時間從每個感測器收集溫度讀數,總共8個點。此資料集很小,僅用於演示目的。我的資料庫結構如下:

資料庫 測量 標籤鍵 標記值 欄位鍵 欄位值 時間戳
感測器 Tc1,Tc2,Th1,Th2 位置 x1,x2 溫度 總共8個 t1,t2

由於溫度讀數儲存在不同的測量中,我再次應用JoinMap來計算效率。我在Chronograf中使用Flux編輯器和Table檢視來顯示所有結果。

首先,我想收集每個感測器的溫度讀數。我從Th1開始。我需要準備資料。我刪除了“_start”和“_stop”列,因為我沒有執行任何組或視窗。我可以刪除“_measurement”和“_field”,因為它們對我的所有資料都是相同的。最後,我對基於“位置”的任何分析不感興趣,所以我也可以放棄它。我將只對相同時間戳的值執行數學運算,因此我保留“_time”列。

Th1 = from(bucket: "sensors") |> range(start: dashboardTime) |> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature") |> drop(columns:["_start", "_stop", "_measurement", "position", “_field”])

現在我可以應用相同的查詢Th2

Th2 = from(bucket: "sensors") |> range(start: dashboardTime) |> filter(fn: (r) => r._measurement == "Th2" and r._field == "temperature") |> drop(columns:["_start", "_stop", "_measurement", "position", “_field”])

接下來,我加入了兩個表。

TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"])

Join預設為左連線。tables:{Th1: Th1, Th2: Th2}允許您指定字尾的命名(相當於Pandas中的“rsuffix / lsuffix”或SQL中的“table.id”語法)。

我也將這個邏輯應用於冷流:

TC = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"])


接下來,我和TH一起加入TC。

join(tables: {TC: TC, TH: TH}, on: ["_time"])

最後,我可以使用Map來計算所有測量的效率。這就是程式碼一起看起來的樣子:

Th1 = from(bucket: "sensors") |> range(start: dashboardTime) |> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature") |> drop(columns:["_start", "_stop", "_measurement", "position", “_field”]) Th2 = from(bucket: "sensors") |> range(start: dashboardTime) |> filter(fn: (r) => r._measurement == "Th2" and r. _field == "temperature") |> drop(columns:["_start", "_stop", "_measurement", "position", “_field”]) TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time"]) Tc1 = from(bucket: "sensors") |> range(start: dashboardTime) |> filter(fn: (r) => r._measurement == "Tc1" and r._field == "temperature") |> drop(columns:["_start", "_stop", "_measurement", "position", “_field”]) Tc2 = from(bucket: "sensors") |> range(start: dashboardTime) |> filter(fn: (r) => r._measurement == "Tc2" and r._field == "temperature") |> drop(columns:["_start", "_stop", "_measurement", "position", “_field”]) TCTH = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"]]) join(tables: {TC: TC, TH: TH}, on: ["_time"]) |> map(fn: (r) => (r._value_Tc2 - r._value_Tc1)/(r._value_Th1 - r._value_Th2)) |> yield(name: "efficiency")

我可以看到傳熱效率隨著時間的推移而降低。這是一個非常簡單的Flux功能的例子,但它讓我的想象力瘋狂。我是否可以構建一個類似於僅使用OSS的DeltaV報警管理解決方案的監控和警報工具?可能不是,但我可以夢想有人可能。

如果您像我一樣並且發現情境化和比較有用,我建議您閱讀我即將進行的使用者體驗評論。在該評論中,我將Flux Joins與Pandas Joins進行了比較。Flux有一些特點。對我來說最明顯的是|>管道前進。起初,我不喜歡它。我幾乎從不使用菸斗和我的小拇指想到必須學習新的中風。現在,我發現它們大大提高了可讀性。每個管道前進都返回一個結果。閱讀Flux查詢感覺就像閱讀要點。