1. 程式人生 > >Flink v1.6.1 官方文件學習 —《基本API概念》

Flink v1.6.1 官方文件學習 —《基本API概念》

目錄

6.5 值

基本API概念

Flink 是實現了作用在分散式集合上的轉換操作(例如 filtering,mapping,updating state,joining,grouping,defining windows,aggregating等)的一般程式。集合最開始從sources(例如,從 files,kafka topics,或本地,記憶體中集合讀取)建立而來。結果通過sinks返回,這些sink可能會將資料寫出到(分散式)檔案,或到標準輸出(例如,終端的命令列)。Flink 程式可以在多種環境下執行,單節點,或嵌入到其他程式中。可以在本地JVM中執行,也可以在多臺機器組成的叢集上執行。

取決於資料來源型別,例如有界或者無界資料來源,您將編寫一個批處理程式或者一個流處理程式。其中,批處理程式用DataSet API,流處理程式用DataStream API。本篇指南將會介紹這兩種API的通用概念,但是您可以檢視Streaming Guide和Batch Guide來檢視這兩種API的詳細資訊。

注意:當展示API用法的例項時,我們將使用StreamingExecutionEnvironment和 DataStream API。二者的概念是一樣的,使用DataSet時只需要 替換 ExecutionEnvironment和 DataSet。

1. Dataset和DataStream

在程式中,Flink使用特殊的類DataSet和DataStream來表示資料。你可以把他們當做是可以包含副本的不可變資料集。DataSet資料是有限的,而DataStream中元素數量可以是無限個的。

在一些方面,這些資料集和常規的Java集合是不同的。首先,他們是不可變的,這意味著集合一旦建立,其中的元素就不可以在新增或者移除。甚至簡單的檢查裡面的元素都不可以。

資料集最開始通過向Flink程式新增source來建立,而新的資料集可以通過轉換(例如map,filter等等)這些資料集衍生而來。

2. Flink程式構成(Anatomy of a Flink Program)

Flink程式看起來和那些轉換資料集的常規程式一樣。每個程式都由下面幾個相同的基本部分構成:

1)獲取執行環境;

2)載入/建立初始資料;

3)指定轉換操作;

4)指定計算結果輸出方式;

5)開始執行。

StreamingExecutionEnvironment是所有Flink程式的基礎,可以通過StreamingExecutionEnvironment的靜態方法獲得:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

一般來說,你只需要使用getExecutionEnvironment()方法,因為這個方法將根據上下文返回正確的執行環境:比如說你使用IDE或者作為通用Java程式來執行你的程式,它將建立一個本地環境,在本地機器上執行你的程式。如果你把程式打成了jar包,並用過命令列啟動,Flink叢集管理器將會執行你的main方法,並且getExecutionEnvironment()方法會返回一個在叢集上執行你的程式的執行環境。

至於指定資料來源,執行環境有多種方法來讀取檔案:你可以一行行的讀,讀取CSV檔案,或者使用完全自定義的輸入資料格式。僅僅讀取一個text檔案為行佇列,你可以使用下面的方法:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

這將返回一個DataStream,在這之上你可以應用轉換操作來建立新的衍生的DataStreams。

你可以在Dataset是呼叫轉換函式來轉換DataSet(官網這裡有問題)。比如map操作:

val input: DataSet[String] = ...

val mapped = input.map { x => x.toInt }

通過將原始集合中的每一個字串轉換成整型,這將建立一個新的DataStream(很明顯官網這裡是有問題的,但是操作是一樣的)。

得到包含最終結果的DataStream後,你可以建立一個sink來將結果寫出到外部系統:

writeAsText(path: String)

print()

完成整個程式的邏輯編碼後,你需要呼叫StreamingExecutionEnvironment的execute()方法來觸發程式執行。根據執行環境ExecutionEnvironment的不同,程式將在本地或者叢集上執行。

execute()方法有一個返回值:JobExecutionResult,這個返回值包含了程式執行時間和累加器結果。

(JobExecutionResult繼承自JobSubmissionResult,JobSubmissionResult有JobID屬性,所以通過JobExecutionResult也可獲得JobID屬性。)

至於DataSet和DataStream的source和sink的詳細資訊,請參考相應的指導文件。

3. 延遲計算(Lazy Evaluation)

所有的Flink程式都是延遲執行的:當執行程式的主方法時,資料的載入和轉化操作並沒有直接發生,而是建立這些操作,並新增到程式計劃。這些操作實際是在ExecutionEnvironment的execute()方法觸發後執行的。程式是在本地執行還是在叢集上執行,取決於ExecutionEnvironment的型別。

延遲計算使得你可以構建複雜的程式,而Flink只需把它當做一個整體的計劃單元執行。

4. 指定鍵(Specifying Keys)

一些轉換操作(join,coGroup,keyBy,groupBy)要求集合內的元素需要定義有鍵。另一些操作(Reduce,GroupReduce,Aggregate,Windows)需要在使用這些操作前將資料按key分組。

DataSet這樣分組(官網不給力,沒給出Scala版本):

val input: DataSet[String] = env.readTextFile("src/main/resources/kv.txt")

val reduced = input
      .groupBy(/*define key here*/)
      .reduceGroup(/*do something*/);

DataStream這樣指定key:

val input: DataStream[String] = env.readTextFile("src/main/resources/kv.txt")

val windowed = input
      .keyBy(/*define key here*/)
      .window(/*window specification*/)

Flink的資料模型不是基於鍵值對的,因此,你不需要手動的把資料集打包成鍵值對。鍵是虛擬的:他們被定義為實際資料上的函式,來引導分組操作。

注意:下面的討論中,我們將使用DataStream API和KeyBy展示。對於DataSet API,只需要用DataSet和groupBy替換即可。

4.1 為Tuple定義鍵(Define keys for Tuples)

最簡單的用例是根據Tuple的一個或多個欄位對Tuple分組:

val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)

Tuple以它的第一個欄位分組(也就是示例中Int型別的那個欄位)

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

這裡,我們使用了一個組合鍵來對Tuple分組。這個組合件由第一和第二個欄位組成。

巢狀Tuple需要注意的一個點:如果你的DataStream內有巢狀的tuple,比如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

使用KeyBy(0)指定鍵,系統將會使用整個Tuple2作為鍵(整型和浮點型的)。如果想使用Tuple2內部欄位作為鍵,你可以使用欄位來表示鍵,這種方法會在後面闡述。

4.2 使用欄位表示式定義鍵(Define keys using Field Expressions)

你可以使用基於字串的欄位表示式來引用巢狀欄位,用這些欄位來為grouping,sorting,joining或者coGroupping定義鍵。

欄位表示式使選擇巢狀(組合)型別資料(例如Tuple,POJO)中的欄位變得非常容易。

在下面的例子中,我們有一個包含兩個欄位:word和count的 wc POJO。為了根據word欄位分組,我們只需把欄位名傳給KeyBy()函式:

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
  def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

欄位表示式語法:

  • 通過欄位名選擇POJO的欄位。例如 user 表示 一個POJO的user欄位;

  • Tuple通過offset來選擇,"_1"和"5"分別代表第一和第六個Scala Tuple欄位

  • POJO和Tuple的巢狀欄位也可以拿到。例如 "user.zip"可以表示POJO的user屬性的zip屬性。任意的巢狀和混合都是支援的,例如 "_2.user.zip"或"user._4.1.zip"

  • 也可以選擇全型別,使用萬用字元表示式"_"。這對不是POJO或者Tuple的型別也適用。

欄位表示式示例:

class WC(var complex: ComplexNestedClass, var count: Int) {
  def this() { this(null, 0) }
}

class ComplexNestedClass(
    var someNumber: Int,
    someFloat: Float,
    word: (Long, Long, String),
    hadoopCitizen: IntWritable) {
  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

上述示例程式碼的有效欄位表示式如下:

  • "count": wc 類的count欄位;

  • "complex": 遞迴的選取ComplexNestedClass的所有欄位;

  • "complex.word._3":ComplexNestedClass類中的tuple word的第三個欄位;

  • "complex.hadoopCitizen":選擇Hadoop IntWritable型別。

4.3 使用鍵選擇器函式定義鍵(Define keys using Key Selector Functions)

還有一種定義鍵的方式叫做“鍵選擇器”函式。鍵選擇器函式需要一個元素作為入參,返回這個元素的鍵。這個鍵可以是任何型別的,也可從指定計算中生成。

下面的示例展示了一個鍵選擇函式,這個函式僅僅返回了一個物件的欄位。

// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )

5. 指定轉換函式(Specifying Transformation Functions)

大多數的轉換操作需要使用者自己定義函式。這一章節列舉了指定這些函式的幾種不同方式。

5.1 Lambda函式(Lambda Functions)

之前見過的,所有的操作都能接受Lambda函式來描述操作:

val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }

val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }

5.2 Rich Functions

所有能把Lambda函式當做引數接收的轉換操作都可以接收Rich函式來替換Lambda函式。例如:

data.map { x => x.toInt }

可以寫成:

class MyMapFunction extends RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
};

data.map(new MyMapFunction())

Rich函式也可以定義成匿名的:

data.map (new RichMapFunction[String, Int] {
  def map(in: String):Int = { in.toInt }
})

Rich函式除了提供使用者自定義函式(map,reduce等),還提供了四種方法:open,close,getRuntimeContext和setRuntimecontext。這些功能在引數化函式、建立和確定本地狀態、獲取廣播變數、獲取執行時資訊(例如累加器和計數器)和迭代資訊時非常有幫助。

6. 支援的資料型別(Supported Data Types)

Flink對DataSet和DataStream中可使用的元素型別添加了一些約束。原因是系統可以通過分析這些型別來確定有效的執行策略。

有7中不同的資料型別:

  • Java Tuple 和 Scala Case類;

  • Java POJO;

  • 基本型別;

  • 通用類;

  • 值;

  • Hadoop Writables;

  • 特殊型別。

6.1 Tuples 和 Case 類

Scala的Case類(以及Scala的Tuple,實際是Case class的特殊型別)是包含了一定數量多種型別欄位的組合型別。Tuple欄位通過他們的1-offset名稱定位,例如 _1代表第一個欄位。Case class 通過欄位名稱獲得:

case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(0, 1) // key by field positions 0 and 1

6.2 POJOs

Java和Scala的類在滿足下列條件時將會被Flink視作特殊的POJO資料型別:

  • 是公共類;

  • 無參構造是公共的;

  • 所有的引數都是可獲得的(宣告為公共的,或提供get,set方法);

  • 欄位的型別必須是Flink支援的。Flink會用Avro來序列化任意的物件(例如Date)

Flink會分析POJO型別的結構,它會獲知POJO的欄位。POJO型別要比一般型別好用。此外,Flink訪問POJO要比一般型別更高效。

class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}

val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

6.3 基本型別

Flink支援Java和Scala所有的基本資料型別,比如 Integer,String,和Double.

6.4 一般通用類

Flink支援大多數的Java,Scala類(API和自定義)。包含不能序列化欄位的類在增加一些限制後也可支援。遵循Java Bean規範的類一般都可以使用。

所有不能視為POJO的類Flink都會當做一般類處理。這些資料型別被視作黑箱,其內容是不可見的。通用類使用Kryo進行序列/反序列化。

6.5 值

值型別需要自己描述他們的序列化/反序列化方式。他們通過實現org.apache.flinktypes.Value介面的read和write方法提供自定義程式碼來進行序列化/反序列化,而不是使用通用的序列化框架。當通用序列化非常低效的時候可以使用值型別。舉個列子:將一個元素稀疏的向量表示為陣列。知道了陣列的元素幾乎都是0,我們可以對非零元素進行特殊編碼,而通用序列化器卻會簡單的寫出所有元素。

以類似的方式,org.apache.flinktypes.CopyableValue介面支援人工內建克隆邏輯(然而並不知道什麼意思)。

對應於基本資料型別,Flink提供了預定義數值型別。(ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。這些數值型別充當這些基本資料型別的可變變數:他們的值是可變的,允許程式設計師複用物件,以減小垃圾回收器的壓力。

6.6 Hadoop Writables

可以使用實現了org.apache.hadoop.Writable介面的型別。定義在write()和ReadFields(0方法中的序列化邏輯,將被用來序列化。

6.7 特殊型別

也可以使用特殊型別,包括Scala的 Either,Option和Try。Java ApI有它自己的Either實現。類似於Scala的Either,代表兩個型別中的一個型別的值,左或者右型別。Either在處理異常或者需要輸出兩種不同型別記錄的時候非常有用。

7. 累加器和計數器(Accumulators & Counters)

累加器構造很簡單, 只需要一個add操作和最終累加結果,這個在程式結束之後才能獲得。

最直接的累加器是個計數器:你可以使用Accumulator.add(v value)方法使之增加。作業結束後,Flink會對所有的部分結果求和,並返回結果給客戶端。

在Debugg或者想快速知道資料更多資訊的時候比較有用。

Flink當前有下列這些內建的累加器。這些累加器都實現了Accumulator介面

  • IntCounter,LongCounter和DoubleCounter.示例在後面

  • Histogram:一個離散數量容器的柱狀圖實現。其內部,只是一個Integer到Ingteger的Map.你可以用它來計算值得分佈。例如對於一個wordCount程式,可以計算每行的單詞的分佈。

7.1 怎樣使用累加器

首先你需要在使用者自定義的轉換操作裡建立一個累加器物件(這裡使用Counter)(這裡又沒有Scala版本)。

val counter = new IntCounter()

然後你需要註冊這個累加器。

getRuntimeContext.addAccumulator("intCounter",intCounter)

現在可以再任何操作函式中使用這個,包括在open()和close()方法裡.

intCounter.add(1)

所有的結構都儲存在JobExecutionResult物件中(由執行環境的execute()方法返回,並且只有job執行結束後可用)。

val counter = env.execute("AccumulatorDemo")
      .getAccumulatorResult("intCounter")

每個job的所有累加器都共享一個名稱空間,因此你在同一個job中的不同操作函式裡可以使用同一個累加器。Flink會在內部聚合所有名稱相同的累加器。

關於累加器和迭代主要注意的一點:當前累加器的結果只能在所有的job都結束後才可獲取。我們計劃讓當前迭代的結果在下一個迭代中可用。你可以使用Aggregators來計算每個迭代的統計資訊,並基於這些資訊終止迭代。

7.2 自定義累加器

實現自己的累加器只需要實現Accumulator介面即可。你可以選擇實現Accumulator或者SimpleAccumulator。

Accumulator<V,R>是最靈活的:它為add的值定義了一個v型別,結果值定義了R型別。對於柱狀圖,v是數值,R是柱子。SimpleAccumulator適用於兩種型別相同的情況。