1. 程式人生 > >聊聊flink的window操作

聊聊flink的window操作

本文主要研究一下flink的window操作

window

DataStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

	public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return
windowAll(TumblingProcessingTimeWindows.of(size)); } else { return windowAll(TumblingEventTimeWindows.of(size)); } } public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return
windowAll(SlidingProcessingTimeWindows.of(size, slide)); } else { return windowAll(SlidingEventTimeWindows.of(size, slide)); } } public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) { return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size))); } public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) { return
windowAll(GlobalWindows.create()) .evictor(CountEvictor.of(size)) .trigger(CountTrigger.of(slide)); } @PublicEvolving public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) { return new AllWindowedStream<>(this, assigner); } 複製程式碼
  • 對於非KeyedStream,有timeWindowAll、countWindowAll、windowAll操作,其中最主要的是windowAll操作,它的parallelism為1,它需要一個WindowAssigner引數,返回的是AllWindowedStream

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(SlidingProcessingTimeWindows.of(size, slide));
		} else {
			return window(SlidingEventTimeWindows.of(size, slide));
		}
	}

	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
		return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
	}

	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
		return window(GlobalWindows.create())
				.evictor(CountEvictor.of(size))
				.trigger(CountTrigger.of(slide));
	}

	@PublicEvolving
	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
		return new WindowedStream<>(this, assigner);
	}
複製程式碼
  • 對於KeyedStream除了繼承了DataStream的window相關操作,它主要用的是timeWindow、countWindow、window操作,其中最主要的是window操作,它也需要一個WindowAssigner引數,返回的是WindowedStream

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java

@Public
public class WindowedStream<T, K, W extends Window> {

	/** The keyed data stream that is windowed by this stream. */
	private final KeyedStream<T, K> input;

	/** The window assigner. */
	private final WindowAssigner<? super T, W> windowAssigner;

	/** The trigger that is used for window evaluation/emission. */
	private Trigger<? super T, ? super W> trigger;

	/** The evictor that is used for evicting elements before window evaluation. */
	private Evictor<? super T, ? super W> evictor;

	/** The user-specified allowed lateness. */
	private long allowedLateness = 0L;

	/**
	 * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
	 * dropped.
 	 */
	private OutputTag<T> lateDataOutputTag;

	@PublicEvolving
	public WindowedStream(KeyedStream<T, K> input,
			WindowAssigner<? super T, W> windowAssigner) {
		this.input = input;
		this.windowAssigner = windowAssigner;
		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
	}

	@PublicEvolving
	public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
		if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
			throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
		}

		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with a custom trigger.");
		}

		this.trigger = trigger;
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
		final long millis = lateness.toMilliseconds();
		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

		this.allowedLateness = millis;
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
		this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with an Evictor.");
		}
		this.evictor = evictor;
		return this;
	}

	// ------------------------------------------------------------------------
	//  Operations on the keyed windows
	// ------------------------------------------------------------------------

	//......
}
複製程式碼
  • WindowedStream有幾個引數,其中構造器要求的是input及windowAssigner引數,然後還有Trigger、Evictor、allowedLateness、OutputTag這幾個可選引數;另外還必須設定operation function,主要有ReduceFunction、AggregateFunction、FoldFunction(廢棄)、ProcessWindowFunction這幾個
  • windowAssigner主要用來決定元素如何劃分到window中,這裡主要有TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows這幾個
  • Trigger用來觸發window的發射,Evictor用來在發射window的時候剔除元素,allowedLateness用於指定允許元素落後於watermark的最大時間,超出則被丟棄(僅僅對於event-time window有效),OutputTag用於將late資料輸出到side output,可以通過SingleOutputStreamOperator.getSideOutput(OutputTag)方法來獲取

AllWindowedStream的屬性/操作基本跟WindowedStream類似,這裡就不詳細展開

小結

  • window操作是處理無限資料流的核心,它將資料流分割為有限大小的buckets,然後就可以在這些有限資料上進行相關的操作。flink的window操作主要分為兩大類,一類是針對KeyedStream的window操作,一個是針對non-key stream的windowAll操作
  • window操作主要有幾個引數,WindowAssigner是必不可少的引數,主要有TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows這幾個;另外還必須設定operation function,主要有ReduceFunction、AggregateFunction、FoldFunction(廢棄)、ProcessWindowFunction這幾個
  • Trigger、Evictor、allowedLateness、OutputTag這幾個為可選引數,Trigger用來觸發window的發射,Evictor用來在發射window的時候剔除元素,allowedLateness用於指定允許元素落後於watermark的最大時間,超出則被丟棄(僅僅對於event-time window有效),OutputTag用於將late資料輸出到side output,可以通過SingleOutputStreamOperator.getSideOutput(OutputTag)方法來獲取

doc