Flink 原理與實現:如何生成 StreamGraph
繼上文Flink 原理與實現:架構和拓撲概覽中介紹了Flink的四層執行圖模型,本文將主要介紹 Flink 是如何根據使用者用Stream API編寫的程式,構造出一個代表拓撲結構的StreamGraph的。
注:本文比較偏原始碼分析,所有程式碼都是基於 flink-1.0.x 版本,建議在閱讀本文前先對Stream API有個瞭解,詳見官方文件。
StreamGraph 相關的程式碼主要在 org.apache.flink.streaming.api.graph
包中。構造StreamGraph的入口函式是 StreamGraphGenerator.generate(env,
transformations)
StreamExecutionEnvironment.execute()
呼叫到。也就是說 StreamGraph 是在 Client 端構造的,這也意味著我們可以在本地通過除錯觀察
StreamGraph 的構造過程。
Transformation
StreamGraphGenerator.generate
的一個關鍵的引數是 List<StreamTransformation<?>>
。StreamTransformation
代表了從一個或多個DataStream
生成新DataStream
的操作。DataStream
的底層其實就是一個 StreamTransformation
DataStream
是怎麼來的。
StreamTransformation的類圖如下圖所示:
DataStream 上常見的 transformation 有 map、flatmap、filter等(見DataStream
Transformation瞭解更多)。這些transformation會構造出一棵 StreamTransformation 樹,通過這棵樹轉換成 StreamGraph。比如 DataStream.map
原始碼如下,其中SingleOutputStreamOperator
為DataStream的子類:
public <R> SingleOutputStreamOperator<R> map |
從上方程式碼可以瞭解到,map轉換將使用者自定義的函式MapFunction
包裝到StreamMap
這個Operator中,再將StreamMap
包裝到OneInputTransformation
,最後該transformation存到env中,當呼叫env.execute
時,遍歷其中的transformation集合構造出StreamGraph。其分層實現如下圖所示:
另外,並不是每一個 StreamTransformation 都會轉換成 runtime 層中物理操作。有一些只是邏輯概念,比如 union、split/select、partition等。如下圖所示的轉換樹,在執行時會優化成下方的操作圖。
union、split/select、partition中的資訊會被寫入到 Source –> Map 的邊中。通過原始碼也可以發現,UnionTransformation
,SplitTransformation
,SelectTransformation
,PartitionTransformation
由於不包含具體的操作所以都沒有StreamOperator成員變數,而其他StreamTransformation的子類基本上都有。
StreamOperator
DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是執行時的具體實現,會決定UDF(User-Defined Funtion)的呼叫方式。下圖所示為 StreamOperator 的類圖(點選檢視大圖):
可以發現,所有實現類都繼承了AbstractStreamOperator
。另外除了 project 操作,其他所有可以執行UDF程式碼的實現類都繼承自AbstractUdfStreamOperator
,該類是封裝了UDF的StreamOperator。UDF就是實現了Function
介面的類,如MapFunction
,FilterFunction
。
生成 StreamGraph 的原始碼分析
我們通過在DataStream上做了一系列的轉換(map、filter等)得到了StreamTransformation集合,然後通過StreamGraphGenerator.generate
獲得StreamGraph,該方法的原始碼如下:
// 構造 StreamGraph 入口函式 public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) { return new StreamGraphGenerator(env).generateInternal(transformations); } // 自底向上(sink->source)對轉換樹的每個transformation進行轉換。 private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) { for (StreamTransformation<?> transformation: transformations) { transform(transformation); } return streamGraph; } // 對具體的一個transformation進行轉換,轉換成 StreamGraph 中的 StreamNode 和 StreamEdge // 返回值為該transform的id集合,通常大小為1個(除FeedbackTransformation) private Collection<Integer> transform(StreamTransformation<?> transform) { // 跳過已經轉換過的transformation if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); // 為了觸發 MissingTypeInfo 的異常 transform.getOutputType(); Collection<Integer> transformedIds; if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() > 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() != null) { streamGraph.setTransformationId(transform.getId(), transform.getUid()); } return transformedIds; } |
最終都會呼叫 transformXXX
來對具體的StreamTransformation進行轉換。我們可以看下transformOnInputTransform(transform)
的實現:
private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) { // 遞迴對該transform的直接上游transform進行轉換,獲得直接上游id集合 Collection<Integer> inputIds = transform(transform.getInput()); // 遞迴呼叫可能已經處理過該transform了 if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); // 新增 StreamNode streamGraph.addOperator(transform.getId(), slotSharingGroup, transform.getOperator(), transform.getInputType(), transform.getOutputType(), transform.getName()); if (transform.getStateKeySelector() != null) { TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } streamGraph.setParallelism(transform.getId(), transform.getParallelism()); // 新增 StreamEdge for (Integer inputId: inputIds) { streamGraph.addEdge(inputId, transform.getId(), 0); } return Collections.singleton(transform.getId()); } |
該函式首先會對該transform的上游transform進行遞迴轉換,確保上游的都已經完成了轉化。然後通過transform構造出StreamNode,最後與上游的transform進行連線,構造出StreamNode。
最後再來看下對邏輯轉換(partition、union等)的處理,如下是transformPartition
函式的原始碼:
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) { StreamTransformation<T> input = partition.getInput(); List<Integer> resultIds = new ArrayList<>(); // 直接上游的id Collection<Integer> transformedIds = transform(input); for (Integer transformedId: transformedIds) { // 生成一個新的虛擬id int virtualId = StreamTransformation.getNewNodeId(); // 新增一個虛擬分割槽節點,不會生成 StreamNode streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner()); resultIds.add(virtualId); } return resultIds; } |
對partition的轉換沒有生成具體的StreamNode和StreamEdge,而是新增一個虛節點。當partition的下游transform(如map)新增edge時(呼叫StreamGraph.addEdge
),會把partition資訊寫入到edge中。如StreamGraph.addEdgeInternal
所示:
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) { addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayList<String>()); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames) { // 當上遊是select時,遞迴呼叫,並傳入select資訊 if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; // select上游的節點id upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); } // 當上遊是partition時,遞迴呼叫,並傳入partitioner資訊 else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; // partition上游的節點id upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtuaPartitionNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); } else { // 真正構建StreamEdge StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // 未指定partitioner的話,會為其選擇 forward 或 rebalance 分割槽。 if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); } // 健康檢查, forward 分割槽必須要上下游的併發度一致 if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } // 建立 StreamEdge StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner); // 將該 StreamEdge 新增到上游的輸出,下游的輸入 getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); } } |
例項講解
如下程式,是一個從 Source 中按行切分成單詞並過濾輸出的簡單流程式,其中包含了邏輯轉換:隨機分割槽shuffle。我們會分析該程式是如何生成StreamGraph的。
DataStream<String> text = env.socketTextStream(hostName, port); text.flatMap(new LineSplitter()).shuffle().filter(new HelloFilter()).print(); |
首先會在env中生成一棵transformation樹,用List<StreamTransformation<?>>
儲存。其結構圖如下:
其中符號*
為input指標,指向上游的transformation,從而形成了一棵transformation樹。然後,通過呼叫StreamGraphGenerator.generate(env,
transformations)
來生成StreamGraph。自底向上遞迴呼叫每一個transformation,也就是說處理順序是Source->FlatMap->Shuffle->Filter->Sink。
如上圖所示:
- 首先處理的Source,生成了Source的StreamNode。
- 然後處理的FlatMap,生成了FlatMap的StreamNode,並生成StreamEdge連線上游Source和FlatMap。由於上下游的併發度不一樣(1:4),所以此處是Rebalance分割槽。
- 然後處理的Shuffle,由於是邏輯轉換,並不會生成實際的節點。將partitioner資訊暫存在
virtuaPartitionNodes
中。 - 在處理Filter時,生成了Filter的StreamNode。發現上游是shuffle,找到shuffle的上游FlatMap,建立StreamEdge與Filter相連。並把ShufflePartitioner的資訊寫到StreamEdge中。
- 最後處理Sink,建立Sink的StreamNode,並生成StreamEdge與上游Filter相連。由於上下游併發度一樣(4:4),所以此處選擇 Forward 分割槽。
最後可以通過 UI視覺化 來觀察得到的 StreamGraph。
總結
本文主要介紹了 Stream API 中 Transformation 和 Operator 的概念,以及如何根據Stream API編寫的程式,構造出一個代表拓撲結構的StreamGraph的。本文的原始碼分析涉及到較多程式碼,如果有興趣建議結合完整原始碼進行學習。下一篇文章將介紹 StreamGraph 如何轉換成 JobGraph 的,其中設計到了圖優化的技巧。