聊聊flink Table的Over Windows
序
本文主要研究一下flink Table的Over Windows
例項
Table table = input .window([OverWindow w].as("w"))// define over window with alias w .select("a, b.sum over w, c.min over w"); // aggregate over the over window w
- Over Windows類似SQL的over子句,它可以基於event-time、processing-time或者row-count;具體可以通過Over類來構造,其中必須設定orderBy、preceding及as方法;它有Unbounded及Bounded兩大類
Unbounded Over Windows例項
// Unbounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w")); // Unbounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w")); // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")); // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
- 對於event-time及processing-time使用unbounded_range來表示Unbounded,對於row-count使用unbounded_row來表示Unbounded
Bounded Over Windows例項
// Bounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w")) // Bounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w")) // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w")) // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
- 對於event-time及processing-time使用諸如1.minutes來表示Bounded,對於row-count使用諸如10.rows來表示Bounded
Table.window
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { //...... @varargs def window(overWindows: OverWindow*): OverWindowedTable = { if (tableEnv.isInstanceOf[BatchTableEnvironment]) { throw new TableException("Over-windows for batch tables are currently not supported.") } if (overWindows.size != 1) { throw new TableException("Over-Windows are currently only supported single window.") } new OverWindowedTable(this, overWindows.toArray) } //...... }
- Table提供了OverWindow引數的window方法,用來進行Over Windows操作,它建立的是OverWindowedTable
OverWindow
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/windows.scala
/** * Over window is similar to the traditional OVER SQL. */ case class OverWindow( private[flink] val alias: Expression, private[flink] val partitionBy: Seq[Expression], private[flink] val orderBy: Expression, private[flink] val preceding: Expression, private[flink] val following: Expression)
- OverWindow定義了alias、partitionBy、orderBy、preceding、following屬性
Over
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/java/windows.scala
object Over { /** * Specifies the time attribute on which rows are grouped. * * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. * * For batch tables, refer to a timestamp or long attribute. */ def orderBy(orderBy: String): OverWindowWithOrderBy = { val orderByExpr = ExpressionParser.parseExpression(orderBy) new OverWindowWithOrderBy(Array[Expression](), orderByExpr) } /** * Partitions the elements on some partition keys. * * @param partitionBy some partition keys. * @return A partitionedOver instance that only contains the orderBy method. */ def partitionBy(partitionBy: String): PartitionedOver = { val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray new PartitionedOver(partitionByExpr) } } class OverWindowWithOrderBy( private val partitionByExpr: Array[Expression], private val orderByExpr: Expression) { /** * Set the preceding offset (based on time or row-count intervals) for over window. * * @param preceding preceding offset relative to the current row. * @return this over window */ def preceding(preceding: String): OverWindowWithPreceding = { val precedingExpr = ExpressionParser.parseExpression(preceding) new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr) } } class PartitionedOver(private val partitionByExpr: Array[Expression]) { /** * Specifies the time attribute on which rows are grouped. * * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. * * For batch tables, refer to a timestamp or long attribute. */ def orderBy(orderBy: String): OverWindowWithOrderBy = { val orderByExpr = ExpressionParser.parseExpression(orderBy) new OverWindowWithOrderBy(partitionByExpr, orderByExpr) } } class OverWindowWithPreceding( private val partitionBy: Seq[Expression], private val orderBy: Expression, private val preceding: Expression) { private[flink] var following: Expression = _ /** * Assigns an alias for this window that the following `select()` clause can refer to. * * @param alias alias for this over window * @return over window */ def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) /** * Assigns an alias for this window that the following `select()` clause can refer to. * * @param alias alias for this over window * @return over window */ def as(alias: Expression): OverWindow = { // set following to CURRENT_ROW / CURRENT_RANGE if not defined if (null == following) { if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) { following = CURRENT_ROW } else { following = CURRENT_RANGE } } OverWindow(alias, partitionBy, orderBy, preceding, following) } /** * Set the following offset (based on time or row-count intervals) for over window. * * @param following following offset that relative to the current row. * @return this over window */ def following(following: String): OverWindowWithPreceding = { this.following(ExpressionParser.parseExpression(following)) } /** * Set the following offset (based on time or row-count intervals) for over window. * * @param following following offset that relative to the current row. * @return this over window */ def following(following: Expression): OverWindowWithPreceding = { this.following = following this } }
- Over類是建立over window的幫助類,它提供了orderBy及partitionBy兩個方法,分別建立的是OverWindowWithOrderBy及PartitionedOver
- PartitionedOver提供了orderBy方法,建立的是OverWindowWithOrderBy;OverWindowWithOrderBy提供了preceding方法,建立的是OverWindowWithPreceding
- OverWindowWithPreceding則包含了partitionBy、orderBy、preceding屬性,它提供了as方法建立OverWindow,另外還提供了following方法用於設定following offset
OverWindowedTable
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class OverWindowedTable( private[flink] val table: Table, private[flink] val overWindows: Array[OverWindow]) { def select(fields: Expression*): Table = { val expandedFields = expandProjectList( fields, table.logicalPlan, table.tableEnv) if(fields.exists(_.isInstanceOf[WindowProperty])){ throw new ValidationException( "Window start and end properties are not available for Over windows.") } val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv) new Table( table.tableEnv, Project( expandedOverFields.map(UnresolvedAlias), table.logicalPlan, // required for proper projection push down explicitAlias = true) .validate(table.tableEnv) ) } def select(fields: String): Table = { val fieldExprs = ExpressionParser.parseExpressionList(fields) //get the correct expression for AggFunctionCall val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv)) select(withResolvedAggFunctionCall: _*) } }
- OverWindowedTable構造器需要overWindows引數;它只提供select操作,其中select可以接收String型別的引數,也可以接收Expression型別的引數;String型別的引數會被轉換為Expression型別,最後呼叫的是Expression型別引數的select方法;select方法建立了新的Table,其Project的projectList為expandedOverFields.map(UnresolvedAlias),而expandedOverFields則通過resolveOverWindows(expandedFields, overWindows, table.tableEnv)得到
小結
- Over Windows類似SQL的over子句,它有Unbounded及Bounded兩大類;它可以基於event-time、processing-time或者row-count;具體可以通過Over類來構造,其中必須設定orderBy、preceding及as方法
- Table提供了OverWindow引數的window方法,用來進行Over Windows操作,它建立的是OverWindowedTable;OverWindow定義了alias、partitionBy、orderBy、preceding、following屬性;Over類是建立over window的幫助類,它提供了orderBy及partitionBy兩個方法,分別建立的是OverWindowWithOrderBy及PartitionedOver,而PartitionedOver提供了orderBy方法,建立的是OverWindowWithOrderBy;OverWindowWithOrderBy提供了preceding方法,建立的是OverWindowWithPreceding;OverWindowWithPreceding則包含了partitionBy、orderBy、preceding屬性,它提供了as方法建立OverWindow,另外還提供了following方法用於設定following offset
- OverWindowedTable構造器需要overWindows引數;它只提供select操作,其中select可以接收String型別的引數,也可以接收Expression型別的引數;String型別的引數會被轉換為Expression型別,最後呼叫的是Expression型別引數的select方法;select方法建立了新的Table,其Project的projectList為expandedOverFields.map(UnresolvedAlias),而expandedOverFields則通過resolveOverWindows(expandedFields, overWindows, table.tableEnv)得到