1. 程式人生 > >《從0到1學習Flink》—— Data Source 介紹

《從0到1學習Flink》—— Data Source 介紹

前言

Data Sources 是什麼呢?就字面意思其實就可以知道:資料來源。

Flink 做為一款流式計算框架,它可用來做批處理,即處理靜態的資料集、歷史的資料集;也可以用來做流處理,即實時的處理些實時資料流,實時的產生資料流結果,只要資料來源源不斷的過來,Flink 就能夠一直計算下去,這個 Data Sources 就是資料的來源地。

Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 來為你的程式新增資料來源。

Flink 已經提供了若干實現好了的 source functions,當然你也可以通過實現 SourceFunction 來自定義非並行的 source 或者實現 ParallelSourceFunction 介面或者擴充套件 RichParallelSourceFunction 來自定義並行的 source,

Flink

StreamExecutionEnvironment 中可以使用以下幾個已實現的 stream sources,

總的來說可以分為下面幾大類:

基於集合

1、fromCollection(Collection) - 從 Java 的 Java.util.Collection 建立資料流。集合中的所有元素型別必須相同。

2、fromCollection(Iterator, Class) - 從一個迭代器中建立資料流。Class 指定了該迭代器返回元素的型別。

3、fromElements(T ...) - 從給定的物件序列中建立資料流。所有物件型別必須相同。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> input = env.fromElements(
	new Event(1, "barfoo", 1.0),
	new Event(2, "start", 2.0),
	new Event(3, "foobar", 3.0),
	...
);
複製程式碼

4、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中建立並行資料流。Class 指定了該迭代器返回元素的型別。

5、generateSequence(from, to) - 建立一個生成指定區間範圍內的數字序列的並行資料流。

基於檔案

1、readTextFile(path) - 讀取文字檔案,即符合 TextInputFormat 規範的檔案,並將其作為字串返回。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");
複製程式碼

2、readFile(fileInputFormat, path) - 根據指定的檔案輸入格式讀取檔案(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內部呼叫的方法。它根據給定的 fileInputFormat 和讀取路徑讀取檔案。根據提供的 watchType,這個 source 可以定期(每隔 interval 毫秒)監測給定路徑的新資料(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應檔案的資料並退出(FileProcessingMode.PROCESS_ONCE)。你可以通過 pathFilter 進一步排除掉需要處理的檔案。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);
複製程式碼

實現:

在具體實現上,Flink 把檔案讀取過程分為兩個子任務,即目錄監控和資料讀取。每個子任務都由單獨的實體實現。目錄監控由單個非並行(並行度為1)的任務執行,而資料讀取由並行執行的多個任務執行。後者的並行性等於作業的並行性。單個目錄監控任務的作用是掃描目錄(根據 watchType 定期掃描或僅掃描一次),查詢要處理的檔案並把檔案分割成切分片(splits),然後將這些切分片分配給下游 reader。reader 負責讀取資料。每個切分片只能由一個 reader 讀取,但一個 reader 可以逐個讀取多個切分片。

重要注意:

如果 watchType 設定為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當檔案被修改時,其內容將被重新處理。這會打破“exactly-once”語義,因為在檔案末尾附加資料將導致其所有內容被重新處理。

如果 watchType 設定為 FileProcessingMode.PROCESS_ONCE,則 source 僅掃描路徑一次然後退出,而不等待 reader 完成檔案內容的讀取。當然 reader 會繼續閱讀,直到讀取所有的檔案內容。關閉 source 後就不會再有檢查點。這可能導致節點故障後的恢復速度較慢,因為該作業將從最後一個檢查點恢復讀取。

基於 Socket:

socketTextStream(String hostname, int port) - 從 socket 讀取。元素可以用分隔符切分。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("localhost", 9999) // 監聽 localhost 的 9999 埠過來的資料
        .flatMap(new Splitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);
複製程式碼

這個在 《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門 文章裡用的就是基於 Socket 的 Word Count 程式。

自定義:

addSource - 新增一個新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011<>(...)) 以從 Apache Kafka 讀取資料

說下上面幾種的特點吧

1、基於集合:有界資料集,更偏向於本地測試用

2、基於檔案:適合監聽檔案修改並讀取其內容

3、基於 Socket:監聽主機的 host port,從 Socket 中獲取資料

4、自定義 addSource:大多數的場景資料都是無界的,會源源不斷的過來。比如去消費 Kafka 某個 topic 上的資料,這時候就需要用到這個 addSource,可能因為用的比較多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 這個基礎類,它是 Flink Kafka 消費的最根本的類。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<KafkaEvent> input = env
		.addSource(
			new FlinkKafkaConsumer011<>(
				parameterTool.getRequired("input-topic"), //從引數中獲取傳進來的 topic 
				new KafkaEventSchema(),
				parameterTool.getProperties())
			.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));
複製程式碼

Flink 目前支援如下圖裡面常見的 Source:

如果你想自己自定義自己的 Source 呢?

那麼你就需要去了解一下 SourceFunction 介面了,它是所有 stream source 的根介面,它繼承自一個標記介面(空介面)Function。

SourceFunction 定義了兩個介面方法:

1、run : 啟動一個 source,即對接一個外部資料來源然後 emit 元素形成 stream(大部分情況下會通過在該方法裡執行一個 while 迴圈的形式來產生 stream)。

2、cancel : 取消一個 source,也即將 run 中的迴圈 emit 元素的行為終止。

正常情況下,一個 SourceFunction 實現這兩個介面方法就可以了。其實這兩個介面方法也固定了一種實現模板。

比如,實現一個 XXXSourceFunction,那麼大致的模板是這樣的:(直接拿 FLink 原始碼的例項給你看看)

最後

本文主要講了下 Flink 的常見 Source 有哪些並且簡單的提了下如何自定義 Source。

關注我

轉載請務必註明原創地址為:www.54tianzhisheng.cn/2018/10/28/…

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然後回覆關鍵字:Flink 即可無條件獲取到。

相關文章

1、《從0到1學習Flink》—— Apache Flink 介紹

2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門

3、《從0到1學習Flink》—— Flink 配置檔案詳解

4、《從0到1學習Flink》—— Data Source 介紹

5、《從0到1學習Flink》—— 如何自定義 Data Source ?

6、《從0到1學習Flink》—— Data Sink 介紹

7、《從0到1學習Flink》—— 如何自定義 Data Sink ?

8、《從0到1學習Flink》—— Flink Data transformation(轉換)