1. 程式人生 > >flink 有狀態udf 引起血案一

flink 有狀態udf 引起血案一

640

場景

近期在做一個畫像的任務,sql實現的,當中有一個udf,會做非常多事情,包含將從redis讀出歷史值加權,並將中間結果和加權後的結果更新到redis。

大家都知道,flink 是能夠支援事件處理的。也就是能夠沒有時間的概念,那麼在聚合,join等操作的時候,flink內部會維護一個狀態,假如此時你也用redis維護了歷史狀態,也即是相似 result = currentState(flink)+lastState(redis)。且此時要針對計算的結果用where進行篩選.

SQL例如以下


   

CREATE VIEW view_count AS
select


 `time`,
 gid,
 cid,
 count(feed_id) * 1 as strength
FROM
 view_cid
GROUP BY
 gid,
 cid,`time`;

CREATE VIEW view_strength AS select
 `time`,
 gid,
 cid ,
 Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result`
FROM
 view_count
;

insert
into
 hx_app_server_sink_common
SELECT
 gid,
 cid,
 `result`
FROM
 view_strength
where `result` <> '0.0'
GROUP BY
 gid,
 cid,
 `result`;

業務分析

第一個sql檢視完畢的是首先分組,然後統計某一個欄位並乘以權重;

第二個sql檢視。udf :Get_Strength_Weaken完畢當前值和歷史值疊加工作,歷史值儲存在redis。同一時候將結果返回並更新redis,返回值作為result欄位。

第三個sql在輸出的時候,result欄位作為了where的條件和group by裡的欄位。

這時候生成的flink概圖例如以下:

640

觀察中間的結構圖能夠發現。Get_Strength_Weaken被呼叫兩次:

1. where條件。這個的生成是因為第三條sql


   

where `result` <> '0.0'

產生的執行計劃,是不是看起來非常懵逼。。

2. select裡面另一次呼叫Get_Strength_Weaken。這個非常明顯。

當然。能夠列印一下flink udf裡eval函式的呼叫細節日誌,非常easy發現反覆呼叫的問題。浪院長這個也是通過分析日誌。對照輸出結果來得出的論。

綜合上面分析和udf呼叫日誌,結論就是udf被呼叫了兩次。

對於這個flink的udf被多次呼叫引起的結果偏大。整整除錯了一下午。

因為上面分析能夠得出結論,flink將where條件下推了,where 條件推斷會先執行,而select裡後執行,那麼能夠調整SQL。例如以下:


   

CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;

CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
getResult(gid,cid) as `result`
FROM
view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result` <> '0.0'
;

insert into
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
GROUP BY
gid,
cid,
`result`;

那麼實際上。select裡的udf主要目的是取出來計算結果。那麼這個時候能夠寫個簡單的udf--getResult,僅僅讓他從redis獲取 where條件裡更新到redis裡的結果,因為該udf是無狀態的即使多次呼叫。也無所謂。

所以。總結一下,對於flink 來說,因為基於事件的處理,聚合、join等操作會有狀態快取,那麼此時再用到含有外部儲存狀態的udf,一定要謹慎,結合執行計劃,來合理放置udf的位置,避免出錯。

當然。除錯階段最好是有具體的日誌。便於分析和定位問題。

flink 狀態刪除

事實上。flink聚合等內部狀態有配置能夠使其自己主動刪除的,具體配置使用例如以下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

[完]

推薦閱讀:

推薦兩個不錯的flink專案

Spark SQL從入門到精通

重要 : 優化flink的四種方式

flink超越Spark的Checkpoint機制

640