1. 程式人生 > >聊聊flink DataStream的join操作

聊聊flink DataStream的join操作

本文主要研究一下flink DataStream的join操作

例項

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
複製程式碼
  • 這裡首先呼叫join,與另外一個stream合併,返回的是JoinedStreams,之後就可以呼叫JoinedStreams的where操作來構建Where物件構造條件;Where有equalTo操作可以構造EqualTo,而EqualTo有window操作可以構造WithWindow,而WithWindow可以設定windowAssigner、trigger、evictor、allowedLateness,它提供apply操作

DataStream.join

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
	//......

	/**
	 * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
	 * and window can be specified.
	 */
	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
		return
new JoinedStreams<>(this, otherStream); } //...... } 複製程式碼
  • DataStream提供了join方法,用於執行join操作,它返回的是JoinedStreams

JoinedStreams

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

@Public
public class JoinedStreams<T1, T2> {

	/** The first input stream. */
	private final DataStream<T1> input1;

	/** The second input stream. */
	private final DataStream<T2> input2;

	public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
		this.input1 = requireNonNull(input1);
		this.input2 = requireNonNull(input2);
	}

	public <KEY> Where<KEY> where
(KeySelector<T1, KEY> keySelector) { requireNonNull(keySelector); final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); return where(keySelector, keyType); } public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) { requireNonNull(keySelector); requireNonNull(keyType); return new Where<>(input1.clean(keySelector), keyType); } //...... } 複製程式碼
  • JoinedStreams主要是提供where操作來構建Where物件

Where

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

	@Public
	public class Where<KEY> {

		private final KeySelector<T1, KEY> keySelector1;
		private final TypeInformation<KEY> keyType;

		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
			this.keySelector1 = keySelector1;
			this.keyType = keyType;
		}

		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
			requireNonNull(keySelector);
			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
			return equalTo(keySelector, otherKey);
		}

		public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
			requireNonNull(keySelector);
			requireNonNull(keyType);

			if (!keyType.equals(this.keyType)) {
				throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
						"first key = " + this.keyType + " , second key = " + keyType);
			}

			return new EqualTo(input2.clean(keySelector));
		}

		//......

	}
複製程式碼
  • Where物件主要提供equalTo操作用於構建EqualTo物件

EqualTo

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

		@Public
		public class EqualTo {

			private final KeySelector<T2, KEY> keySelector2;

			EqualTo(KeySelector<T2, KEY> keySelector2) {
				this.keySelector2 = requireNonNull(keySelector2);
			}

			/**
			 * Specifies the window on which the join operation works.
			 */
			@PublicEvolving
			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
			}
		}
複製程式碼
  • EqualTo物件提供window操作用於構建WithWindow物件

WithWindow

/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

	@Public
	public static class WithWindow<T1, T2, KEY, W extends Window> {

		private final DataStream<T1> input1;
		private final DataStream<T2> input2;

		private final KeySelector<T1, KEY> keySelector1;
		private final KeySelector<T2, KEY> keySelector2;
		private final TypeInformation<KEY> keyType;

		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

		private final Time allowedLateness;

		private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;

		@PublicEvolving
		protected WithWindow(DataStream<T1> input1,
				DataStream<T2> input2,
				KeySelector<T1, KEY> keySelector1,
				KeySelector<T2, KEY> keySelector2,
				TypeInformation<KEY> keyType,
				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
				Time allowedLateness) {

			this.input1 = requireNonNull(input1);
			this.input2 = requireNonNull(input2);

			this.keySelector1 = requireNonNull(keySelector1);
			this.keySelector2 = requireNonNull(keySelector2);
			this.keyType = requireNonNull(keyType);

			this.windowAssigner = requireNonNull(windowAssigner);

			this.trigger = trigger;
			this.evictor = evictor;

			this.allowedLateness = allowedLateness;
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, newTrigger, evictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, newEvictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
				windowAssigner, trigger, evictor, newLateness);
		}

		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				JoinFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				input1.getType(),
				input2.getType(),
				"Join",
				false);

			return apply(function, resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
			return (SingleOutputStreamOperator<T>) apply(function);
		}

		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new FlatJoinCoGroupFunction<>(function), resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			return (SingleOutputStreamOperator<T>) apply(function, resultType);
		}

		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				FlatJoinFunction.class,
				0,
				1,
				2,
				new int[]{2, 0},
				input1.getType(),
				input2.getType(),
				"Join",
				false);

			return apply(function, resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
			return (SingleOutputStreamOperator<T>) apply(function);
		}

		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new JoinCoGroupFunction<>(function), resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			return (SingleOutputStreamOperator<T>) apply(function, resultType);
		}

		@VisibleForTesting
		Time getAllowedLateness() {
			return allowedLateness;
		}

		@VisibleForTesting
		CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
			return coGroupedWindowedStream;
		}
	}
複製程式碼
  • WithWindow可以設定windowAssigner、trigger、evictor、allowedLateness,它提供apply操作(with操作被標記為廢棄)
  • apply操作可以接收JoinFunction或者FlatJoinFunction,它內部是使用DataStream的coGroup方法建立CoGroupedStreams,之後將自身的where及equalTo的keySelector、windowAssigner、trigger、evictor、allowedLateness都設定給CoGroupedStreams,最後呼叫CoGroupedStreams的WithWindow物件的apply方法
  • CoGroupedStreams的WithWindow物件的apply方法與JoinedStreams的WithWindow物件的apply方法引數不同,CoGroupedStreams的WithWindow的apply方法接收的是CoGroupFunction,因而JoinedStreams的WithWindow物件的apply方法內部將JoinFunction或者FlatJoinFunction包裝為CoGroupFunction(JoinFunction使用JoinCoGroupFunction包裝,FlatJoinFunction使用FlatJoinCoGroupFunction包裝)傳遞給CoGroupedStreams的WithWindow的apply方法

JoinFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/JoinFunction.java

@Public
@FunctionalInterface
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {

	/**
	 * The join method, called once per joined pair of elements.
	 *
	 * @param first The element from first input.
	 * @param second The element from second input.
	 * @return The resulting element.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	OUT join(IN1 first, IN2 second) throws Exception;
}
複製程式碼
  • JoinFunction繼承了Function、Serializable,它定義了join操作,預設是inner join的語義,如果需要outer join,可以使用CoGroupFunction

FlatJoinFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/FlatJoinFunction.java

@Public
@FunctionalInterface
public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {

	/**
	 * The join method, called once per joined pair of elements.
	 *
	 * @param first The element from first input.
	 * @param second The element from second input.
	 * @param out The collector used to return zero, one, or more elements.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
}
複製程式碼
  • FlatJoinFunction繼承了Function、Serializable,它定義了join操作,預設是inner join的語義,如果需要outer join,可以使用CoGroupFunction;與JoinFunction的join方法不同,FlatJoinFunction的join方法多了Collector引數,可以用來發射0條、1條或者多條資料,所以是Flat命名

CoGroupedStreams

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public
public class CoGroupedStreams<T1, T2> {
	//......

@Public
	public static class WithWindow<T1, T2, KEY, W extends Window> {
		private final DataStream<T1> input1;
		private final DataStream<T2> input2;

		private final KeySelector<T1, KEY> keySelector1;
		private final KeySelector<T2, KEY> keySelector2;

		private final TypeInformation<KEY> keyType;

		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

		private final Time allowedLateness;

		private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;

		protected WithWindow(DataStream<T1> input1,
				DataStream<T2> input2,
				KeySelector<T1, KEY> keySelector1,
				KeySelector<T2, KEY> keySelector2,
				TypeInformation<KEY> keyType,
				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
				Time allowedLateness) {
			this.input1 = input1;
			this.input2 = input2;

			this.keySelector1 = keySelector1;
			this.keySelector2 = keySelector2;
			this.keyType = keyType;

			this.windowAssigner = windowAssigner;
			this.trigger = trigger;
			this.evictor = evictor;

			this.allowedLateness = allowedLateness;
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, newTrigger, evictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, newEvictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, evictor, newLateness);
		}

		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

			TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
				function,
				input1.getType(),
				input2.getType(),
				"CoGroup",
				false);

			return apply(function, resultType);
		}

		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
			UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
					.map(new Input1Tagger<T1, T2>())
					.setParallelism(input1.getParallelism())
					.returns(unionType);
			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
					.map(new Input2Tagger<T1, T2>())
					.setParallelism(input2.getParallelism())
					.returns(unionType);

			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

			// we explicitly create the keyed stream to manually pass the key type information in
			windowedStream =
					new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
					.window(windowAssigner);

			if (trigger != null) {
				windowedStream.trigger(trigger);
			}
			if (evictor != null) {
				windowedStream.evictor(evictor);
			}
			if (allowedLateness != null) {
				windowedStream.allowedLateness(allowedLateness);
			}

			return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
		}

		//......

	}

	//......
}
複製程式碼
  • CoGroupedStreams的整體類結構跟JoinedStreams很像,CoGroupedStreams提供where操作來構建Where物件;Where物件主要提供equalTo操作用於構建EqualTo物件;EqualTo物件提供window操作用於構建WithWindow物件;WithWindow可以設定windowAssigner、trigger、evictor、allowedLateness,它提供apply操作;其中一個不同的地方是CoGroupedStreams定義的WithWindow物件的apply操作接收的Function是CoGroupFunction型別,而JoinedStreams定義的WithWindow物件的apply操作接收的Function型別是JoinFunction或FlatJoinFunction

CoGroupFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.java

@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

	/**
	 * This method must be implemented to provide a user implementation of a
	 * coGroup. It is called for each pair of element groups where the elements share the
	 * same key.
	 *
	 * @param first The records from the first input.
	 * @param second The records from the second.
	 * @param out A collector to return elements.
	 *
	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
	 *                   and may trigger the recovery logic.
	 */
	void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
複製程式碼
  • CoGroupFunction繼承了Function、Serializable,它定義了coGroup操作,可以用來實現outer join,其引數使用的是Iterable,而JoinFunction與FlatJoinFunction的join引數使用的是單個物件型別

WrappingFunction

flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/operators/translation/WrappingFunction.java

@Internal
public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {

	private static final long serialVersionUID = 1L;

	protected T wrappedFunction;

	protected WrappingFunction(T wrappedFunction) {
		this.wrappedFunction = wrappedFunction;
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		FunctionUtils.openFunction(this.wrappedFunction, parameters);
	}

	@Override
	public void close() throws Exception {
		FunctionUtils.closeFunction(this.wrappedFunction);
	}

	@Override
	public void setRuntimeContext(RuntimeContext t) {
		super.setRuntimeContext(t);

		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
	}

	public T getWrappedFunction () {
		return this.wrappedFunction;
	}
}
複製程式碼
  • WrappingFunction繼承了AbstractRichFunction,這裡它覆蓋了父類的open、close、setRuntimeContext方法,用於管理wrappedFunction

JoinCoGroupFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

	/**
	 * CoGroup function that does a nested-loop join to get the join result.
	 */
	private static class JoinCoGroupFunction<T1, T2, T>
			extends WrappingFunction<JoinFunction<T1, T2, T>>
			implements CoGroupFunction<T1, T2, T> {
		private static final long serialVersionUID = 1L;

		public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
			super(wrappedFunction);
		}

		@Override
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					out.collect(wrappedFunction.join(val1, val2));
				}
			}
		}
	}
複製程式碼
  • JoinCoGroupFunction繼承了WrappingFunction,同時實現CoGroupFunction介面定義的coGroup方法,預設是遍歷第一個集合,對其每個元素遍歷第二個集合,挨個執行wrappedFunction.join,然後發射join資料
  • JoinedStreams定義了私有靜態類JoinCoGroupFunction,JoinedStreams的WithWindow物件的apply方法內部使用它將JoinFunction進行包裝,然後去呼叫CoGroupedStreams的WithWindow的apply方法
  • JoinFunction定義的join方法,接收的是兩個物件型別引數,而JoinCoGroupFunction定義的coGroup方法,接收的兩個Iterable型別引數

FlatJoinCoGroupFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java

	/**
	 * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
	 */
	private static class FlatJoinCoGroupFunction<T1, T2, T>
			extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
			implements CoGroupFunction<T1, T2, T> {
		private static final long serialVersionUID = 1L;

		public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
			super(wrappedFunction);
		}

		@Override
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					wrappedFunction.join(val1, val2, out);
				}
			}
		}
	}
複製程式碼
  • FlatJoinCoGroupFunction繼承了WrappingFunction,同時實現CoGroupFunction介面定義的coGroup方法,預設是遍歷第一個集合,對其每個元素遍歷第二個集合,挨個執行wrappedFunction.join,然後發射join資料
  • JoinedStreams定義了私有靜態類FlatJoinCoGroupFunction,JoinedStreams的WithWindow物件的apply方法內部使用它將FlatJoinFunction進行包裝,然後去呼叫CoGroupedStreams的WithWindow的apply方法
  • FlatJoinFunction定義的join方法,接收的是兩個物件型別引數,而FlatJoinCoGroupFunction定義的coGroup方法,接收的兩個Iterable型別引數

小結

  • DataStream提供了join方法,用於執行join操作,它返回的是JoinedStreams;JoinedStreams主要是提供where操作來構建Where物件;Where物件主要提供equalTo操作用於構建EqualTo物件;EqualTo物件提供window操作用於構建WithWindow物件;WithWindow可以設定windowAssigner、trigger、evictor、allowedLateness,它提供apply操作
  • apply操作可以接收JoinFunction或者FlatJoinFunction,它內部是使用DataStream的coGroup方法建立CoGroupedStreams,之後將自身的where及equalTo的keySelector、windowAssigner、trigger、evictor、allowedLateness都設定給CoGroupedStreams,最後呼叫CoGroupedStreams的WithWindow物件的apply方法;JoinFunction及FlatJoinFunction都繼承了Function、Serializable,它定義了join操作,預設是inner join的語義,如果需要outer join,可以使用CoGroupFunction;而FlatJoinFunction與JoinFunction的join的不同之處的在於FlatJoinFunction的join方法多了Collector引數,可以用來發射0條、1條或者多條資料,所以是Flat命名
  • CoGroupedStreams的WithWindow物件的apply方法與JoinedStreams的WithWindow物件的apply方法引數不同,CoGroupedStreams的WithWindow的apply方法接收的是CoGroupFunction,因而JoinedStreams的WithWindow物件的apply方法內部將JoinFunction或者FlatJoinFunction包裝為CoGroupFunction(JoinFunction使用JoinCoGroupFunction包裝,FlatJoinFunction使用FlatJoinCoGroupFunction包裝),然後去呼叫CoGroupedStreams的WithWindow的apply方法;JoinCoGroupFunction與FlatJoinCoGroupFunction都繼承了WrappingFunction(它繼承了AbstractRichFunction,這裡它覆蓋了父類的open、close、setRuntimeContext方法,用於管理wrappedFunction),同時實現CoGroupFunction介面定義的coGroup方法,不同的是一個是包裝JoinFunction,一個是包裝FlatJoinFunction,不同的是後者是包裝FlatJoinFunction,因而join方法多傳遞了out引數

doc