1. 程式人生 > >將Flink中的批處理的WordCount轉化為流處理的WordCount

將Flink中的批處理的WordCount轉化為流處理的WordCount

將Flink中的批處理的WordCount轉化為流處理的WordCount


目的:將Flink中批處理的WordCount轉化為流處理的WordCount
作用:感覺毫無用處
如何實現:將批的environmentBatch中的各個運算元,在流的environmentStream中重寫一遍

程式碼如下:

package org.apache.flink.examples.java.maqy;
/**
 * 實現Flink中Batch的WordCount到流的WordCount的轉換
 * 注意:流的WordCount相同的邏輯,每到來一個新元素都會進行一次輸出,所以輸出結果會不同
 *
 * Flink版本:1.4.2
 * @author maqy
 * @date 2018.08.11
 */

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.AggregationFunction;
import org.apache.flink.api.java.aggregation.SumAggregationFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;

public class BatchToStream {

	public static void main(String[] args) throws Exception {

		// set up the execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		env.setParallelism(1);
		// get input data
//		DataSet<String> text = env.fromElements(
//				"To be, or not to be,--that is the question:--",
//				"Whether 'tis nobler in the mind to suffer",
//				"The slings and arrows of outrageous fortune",
//				"Or to take arms against a sea of troubles,"
//				);
		//輸入檔案
		DataSet<String> a = env.readTextFile("F:\\test.txt");

		DataSet<Tuple2<String, Integer>> b = a.flatMap(new LineSplitter());

		//DataSet<Tuple2<String, Integer>> d = b.sum(1);
		DataSet<Tuple2<String, Integer>> c = b.groupBy(0)
			.sum(1);
		//sink必須單獨寫????,放在上一行後頭會報錯,原因是因為返回的是datasink型別
		c.writeAsText("F:\\output\\batchToStream");

//		DataSet<Tuple2<String, Integer>> counts = env.readTextFile("/home/maqy/桌面/out/test")
//				// split up the lines in pairs (2-tuples) containing: (word,1)
//				.flatMap(new LineSplitter())
//				// group by the tuple field "0" and sum up tuple field "1"
//				.groupBy(0)
//				.sum(1);
//
//		// execute and print result
//		counts.writeAsText("/home/maqy/桌面/out/out1");

		StreamExecutionEnvironment envStream = batchToStream(env);
		//執行程式的是流的Environment
		//env.execute("batch job~~~~~~~~~~~~~~");
		envStream.execute("StreamJob~~~~~~~~~~~~~");

	}

	//實現批的環境到流的環境的轉換,傳入envBatch,返回StreamExecutionEnvironment
	public static StreamExecutionEnvironment batchToStream(ExecutionEnvironment envBatch) throws Exception {
		//建立一個新的流環境,用於返回的
		StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment();
		//設定並行度只能在這裡設定,不然沒用
		envStream.setParallelism(1);

		//這裡考慮下用DataSet 還是用 Environment,得到環境中的sinks
		List<DataSink<?>> batchSinks = envBatch.getSinks();

		for (DataSink dataSink : batchSinks) {
			//先定義一個數據流
			DataStream first = null;
			//對每個sink進行操作,找到源頭?
			DataSet dataSetLast = dataSink.getDataSet();
			//Operator繼承了DataSet

			DataSet p = dataSetLast;

			//不這麼寫,first會為null,初步判斷是因為first在datasource時建立的時候,重新定向到新的地址了
			first = preVisit(p, envStream, first);
			//轉換sink
			OutputFormat dataSinkOutputFormat = dataSink.getFormat();
			if (dataSinkOutputFormat instanceof TextOutputFormat) {
				System.out.println("dataSinkOutputFormat is a TextOutputFormat");
				Path path = ((TextOutputFormat) dataSinkOutputFormat).getOutputFilePath();
				first.writeUsingOutputFormat(new TextOutputFormat(path));
			}
			//first.addSink(dataSink.)
		}
		//System.out.println("size:"+batchSinks.size());
		//返回流環境,後期還可以考慮是否可以合併到原本存在的流環境中,甚至將各個datastream也加以返回,重新利用
		return envStream;
	}

	//從尾向前遍歷,並轉化
	public static DataStream preVisit(DataSet dataSet, StreamExecutionEnvironment envStream, DataStream first) {

		if (!(dataSet instanceof DataSource)) {       // && (dataSet != null)
			//如果沒有到DataSource節點,則遞迴
			first = preVisit(getPre(dataSet), envStream, first);
		}
		//對節點進行相應的操作
		if (dataSet == null) {
			System.out.println("source is null");
		} else if (dataSet instanceof DataSource) {
			//得到源頭後,看源屬於哪一種型別,然後新增到流中
			//而且從dataSource可以得到輸出的資料型別
			//這裡可以得到輸入的資料的型別,但還不知道怎麼用到DataStream中
//			TypeInformation sourceTypeInfo = ((DataSource) dataSet).getResultType();
//			Class sourceType = sourceTypeInfo.getTypeClass();
			//System.out.println("sourceType:"+sourceTypeInfo.getTypeClass());
			InputFormat inputFormat = ((DataSource) dataSet).getInputFormat();
			if (inputFormat instanceof TextInputFormat) {
				//後期可以考慮是否可以直接轉換運算元
				System.out.println("inputFormat is TextInputFormat");
				String filePath = ((TextInputFormat) inputFormat).getFilePath().toString();
				System.out.println("輸入的檔案路徑為:" + filePath);
				//這裡相當於讓first重新指向一個新地址了????
				first = envStream.readTextFile(filePath);
			}
		} else if (dataSet instanceof SingleInputOperator) {
			System.out.println("SingleInputOperator yes");
			//如果是SingleInputOperator,再判斷具體型別,SingleInputOperator中有DataSet 型別的 input。
			//((SingleInputOperator) dataSet).getInput();
			if (dataSet instanceof AggregateOperator) {
				System.out.println("AggregateOperator yes");
				//可以有多個aggregationFunctions,還有個對應的List<Integer> fields,預設好像是4
				List<AggregationFunction<?>> aggregationFunctions = ((AggregateOperator) dataSet).getAggregationFunctions();
				List<Integer> fields = ((AggregateOperator) dataSet).getFields();

				//首先要得到是否被groupBy過了,即是否可以得到UnsortedGrouping型別,得不到則是null
				Grouping grouping=((AggregateOperator) dataSet).getGrouping();
				//如果grouping不是null的話,則說明經過了groupBy,則進行相應的轉換
				if(grouping != null){
					int position = 0; //暫時只考慮一個的情況
					if(grouping instanceof SortedGrouping){
						System.out.println("SortedGrouping yes");
					}else if(grouping instanceof UnsortedGrouping){
						System.out.println("UnsortedGrouping yes");
						//Keys中有keyFields和originalKeyTypes,這裡的後者是String
						Keys keys=grouping.getKeys();
						if(keys instanceof Keys.ExpressionKeys){
							System.out.println("Keys.ExpressionKeys yes");
							//這裡還沒弄清楚有多個時的意思
							int numOfKeyFields = keys.getNumberOfKeyFields();
							int[] positions = keys.computeLogicalKeyPositions();
							if(numOfKeyFields == 1){
								position = positions[0];
							}
							//這裡還方便了我,在流中不用考慮UnsortedGrouping這種東西
							first = first.keyBy(position);
						}else if(keys instanceof Keys.SelectorFunctionKeys){
							System.out.println("Keys.SelectorFunctionKeys yes");
						}
					}
				}

				//先考慮數目為1的情況,因為需要先keyby再sum
				if(aggregationFunctions.size()==1 && fields.size()==1){
					if(aggregationFunctions.get(0) instanceof SumAggregationFunction){
						if(first instanceof KeyedStream){
							first = ((KeyedStream) first).sum(fields.get(0));
						}else{
							System.out.println("Stream中sum的話一定要keyby麼,似乎是。。。");
						}
					}
				}

				//AggregateOperator aggregateOperator = (AggregateOperator) dataSet;

			} else if (dataSet instanceof SingleInputUdfOperator) {
				System.out.println("SingleInputUdfOperator yes");
				if (dataSet instanceof FlatMapOperator){
					System.out.println("FlatMapOperator yes");
					FlatMapFunction flatMapFunction= ((FlatMapOperator) dataSet).getFlatMapFunction();
					first = first.flatMap(flatMapFunction);
				}
			} else {
				System.out.println("Not sure what SingleInputOperator");
			}
		} else if (dataSet instanceof TwoInputOperator) {
			System.out.println("TwoInputOperator yes");
		} else {
			System.out.println("not sure what Operator");
		}

		return first;
	}

	//得到一個前驅
	public static DataSet getPre(DataSet dataSet) {
		if (dataSet instanceof Operator) {
			System.out.println("Operator yes");
			if (dataSet instanceof DataSource) {
				System.out.println("DataSource yes");
				return (DataSource) dataSet;
			} else if (dataSet instanceof SingleInputOperator) {
				System.out.println("SingleInputOperator yes");
				//如果是SingleInputOperator,再判斷具體型別,SingleInputOperator中有DataSet 型別的 input。
				return ((SingleInputOperator) dataSet).getInput();
			} else if (dataSet instanceof TwoInputOperator) {
				System.out.println("TwoInputOperator yes");
				/////
			} else {
				System.out.println("not sure what Operator");
				/////
			}
		} else {
			System.out.println("no Operator");
			/////
		}
		return null;
	}

	//
	// 	User Functions
	//

	/**
	 * Implements the string tokenizer that splits sentences into words as a user-defined
	 * FlatMapFunction. The function takes a line (String) and splits it into
	 * multiple pairs in the form of "(word,1)" (Tuple2&lt;String, Integer&gt;).
	 */
	public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

		@Override
		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
			// normalize and split the line
			String[] tokens = value.toLowerCase().split("\\W+");

			// emit the pairs
			for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2<String, Integer>(token, 1 ));
				}
			}
		}
	}
}

輸入文字為:

a b c d a a b
a a a 

輸出(可以看到每來一個新單詞都會進行一次輸出):

(a,1)
(b,1)
(c,1)
(d,1)
(a,2)
(a,3)
(b,2)
(a,4)
(a,5)
(a,6)

正常的批處理的輸出:

(a,6)
(b,2)
(c,1)
(d,1)