1. 程式人生 > >Flink之DataStreamAPI入門

Flink之DataStreamAPI入門

目錄

本文API基於Flink 1.4

def main(args: Array[String]) {

  // 第一種會自動判斷用本地還是遠端。本地也可以用createLocalEnvironment() 
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("JMhost", 1234, "path/to/jarFile.jar to ship to the JobManager")  

  // 設定時間語義為event time。env還有很多set方法:
  // state backend預設in memory,.setStateBackend(new 
  // enableCheckpointing,然後checkpoint設定
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.getConfig.setAutoWatermarkInterval(1000L)

  // create a DataStream[SensorReading] from a stream source
  val sensorData: DataStream[SensorReading] = env
    // SensorSource 是繼承了 SourceFunction 中的 RichParalleSourceFunction 的類
    .addSource(new SensorSource)
    .setParallelism(4)
    // assign timestamps and watermarks (required for event time)
    .assignTimestampsAndWatermarks(new SensorTimeAssigner)

  val avgTemp: DataStream[SensorReading] = sensorData
    // convert Fahrenheit to Celsius with an inline
    .map( r => {
        val celsius = (r.temperature - 32) * (5.0 / 9.0)
        SensorReading(r.id, r.timestamp, celsius)
      } )
    .keyBy(_.id) // 不是需要建立key,只是函式指定資料中的某部分作為key
    // shortcut for window.(TumblingEventTimeWindows.of(Time.seconds(5))),如果上面設定了processTime,那就是另一個的縮寫
    .timeWindow(Time.seconds(5))
    // compute average temperature using a UDF
    .apply(new TemperatureAverager)

  avgTemp.print()
  // 提交到叢集時,execute將dataflow提交到遠端JM。
  // IDE模式下JM和TM在同一JVM程序,.execute()變為啟動並執行Flink。
  // 之後構建執行計劃,從sources到所有transformations,最後執行計劃。
  env.execute("Compute average sensor temperature")
}

注意,map、flatMap運算元需要TypeInformation的隱式轉換,即implicit val typeInfo = TypeInformation.of(classOf[map後的型別])。但更好的辦法是import org.apache.flink.streaming.api.scala._org.apache.flink.api.scala._靜態資料

從map到apply都是transformation operator,它的作用一般是一用反射拿到相應運算元的輸出型別,二是通過transform返回一個Operator。而transform同時會把操作註冊到執行環境,用於後續生成DAG。

Types

  • Primitives

  • Java and Scala tuples

    Java的tuple是mutable,可以setField(newValue, index),而且index從0開始。

    DataStream<Tuple2<String, Integer>> persons = env.fromElements(
      Tuple2.of("Adam", 17), 
      Tuple2.of("Sarah", 23));
    
    persons.filter(p -> p.f1 > 18)
    })
  • Scala case classes

  • POJOs, including classes generated by Apache Avro

    POJO條件是:public class,public 無參構造器,所有成員變數public或可以通過getter和setter訪問(遵循預設名字),所有成員變數的型別都是Flink支援的。

  • Flink Value types

    實現org.apache.flink.types.Value介面中的read()和write()的序列化邏輯。

    Flink提供內建的Value types,如IntValue, DoubleValue, and StringValue,且是可變的。

  • Some special types

    Scala’s Either, Option, and Try types。Flink’s Java version of the Either type.

    primitive and object Array types, Java Enum types and Hadoop Writable types

對於Java的型別推斷

如果函式的返回值是泛型,那麼要加returns。具體哪個原文也沒寫具體...

.map(new MyMapFunction<Long, MyType>())
.returns(MyType.class);

.flatMap(new MyFlatMapFunction<String, Integer>())
.returns(new TypeHint<Integer>(){});

class MyFlatMapFunction<T, O> implements FlatMapFunction<T, O> {
   public void flatMap(T value, Collector<O> out) { ... }
}

TypeInformation

作為key的自定義類。

TypeInformation maps fields from the types to fields in a flat schema. Basic types are mapped to single fields and tuples and case classes are mapped to as many fields as the class has. The flat schema must be valid for all type instances, thus variable length types like collections and arrays are not assigned to individual fields, but they are considered to be one field as a whole.

// Create TypeInformation and TypeSerializer for a 2-tuple in Scala
// get the execution config
val config = inputStream.executionConfig

...

// create the type information,要引入 org.apache.flink.streaming.api.scala._
val tupleInfo: TypeInformation[(String, Double)] =
    createTypeInformation[(String, Double)

// create a serializer
val tupleSerializer = typeInfo.createSerializer(config)

Transformations

為了讓Java和Scala的程式碼儘量相似,Flink減去了Scala的一些隱式,特別是模式匹配方面會和Spark有所不同。如果需要這些隱式,可以import org.apache.flink.streaming.api.scala.extensions._,或者用相應的函式名,如map改用mapWith

  1. Basic transformations are transformations on individual events.

    // id為sensor_N,split後變為兩條連續資料
    val sensorIds: DataStream[String] = ...
    val splitIds: DataStream[String] = sensorIds
      .flatMap(id => id.split("_"))
  2. KeyedStream transformations are transformations that are applied to events in the context of a key.

    如果unique key很多,要小心記憶體不夠

    ROLLING AGGREGATIONS,如sum(), min(), minBy()返回擁有最小值的event。只能使用一個。

    val resultStream: DataStream[(Int, Int, Int)] = inputStream
      .keyBy(0) // key on first field of the tuple
      .sum(1)   // sum the second field of the tuple
    // 輸出結果
     //(1,2,2) followed by (1,7,2), (2,3,1) followed by (2,5,1)
    // 其實keyBy不一定要是record的成員變數
    val keyedStream = input.keyBy(value => math.max(value._1, value._2))

    REDUCE類似累加器,只要符合這個描述符就行(T, T)=> T

    val reducedSensors = readings
      .keyBy(_.id)
      .reduce((r1, r2) => {
        val highestTimestamp = Math.max(r1.timestamp, r2.timestamp)
        SensorReading(r1.id, highestTimestamp, r1.temperature)
      })
  3. Multi-stream transformations merge multiple streams into one stream or split one stream into multiple streams.

    CONNECT, COMAP, AND COFLATMAP:

    val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
    // repartition-repartition
      .connect(second)
      .keyBy(0, 0) // key both input streams on first attribute
    // 對於ConnectedStreams有map和flatMap方法介面,如map:
    CoMapFunction[IN1, IN2, OUT]
        > map1(IN1): OUT
        > map2(IN2): OUT
    // map1和map2的呼叫順序是無法確定的,都是當event到達時儘快呼叫
    
    // connect streams with broadcast
    val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
    // broadcast-forward
      .connect(second.broadcast()) // 將second複製並廣播到每個first流
    
    // 例子
    // group sensor readings by their id
    val keyed: KeyedStream[SensorReading, String] = tempReadings
      .keyBy(_.id)
    
    // connect the two streams and raise an alert
    // if the temperature and smoke levels are high
    val alerts = keyed
      .connect(smokeReadings.broadcast)
      .flatMap(new RaiseAlertFlatMap)
    class RaiseAlertFlatMap extends CoFlatMapFunction[SensorReading, SmokeLevel, Alert] {
      // 注意,這個變數沒有checkpoint
      var smokeLevel = SmokeLevel.Low
    
      override def flatMap1(in1: SensorReading, collector: Collector[Alert]): Unit = {
        // high chance of fire => true
        if (smokeLevel.equals(SmokeLevel.High) && in1.temperature > 100) {
          collector.collect(Alert("Risk of fire!", in1.timestamp))
        }
      }
    
      override def flatMap2(in2: SmokeLevel, collector: Collector[Alert]): Unit = {
        smokeLevel = in2
      }
    }

    SPLIT [DATASTREAM -> SPLITSTREAM] AND SELECT

    val inputStream: DataStream[(Int, String)] = ...
    
    val splitted: SplitStream[(Int, String)] = inputStream
      .split(t => if (t._1 > 1000) Seq("large") else Seq("small"))
    
    val large: DataStream[(Int, String)] = splitted.select("large")
    val small: DataStream[(Int, String)] = splitted.select("small")
    val all: DataStream[(Int, String)] = splitted.select("small", "large") 
  4. Partitioning transformation reorganize stream events.

    shuffle()

    rebalance():全部均勻,比如兩條流都平均分到4條流

    rescale():部分均勻,比如將兩條流各自平均分為兩條,即下游有4個並行度

    broadcast():複製資料併發到所有下游並行任務

    global():全部event發到第一個並行task

    partitionCustom():

    val numbers: DataStream[(Int)] = ...
    numbers.partitionCustom(myPartitioner, 0)
    
    object myPartitioner extends Partitioner[Int] {
      val r = scala.util.Random
    
      override def partition(key: Int, numPartitions: Int): Int = {
        if (key < 0) 0 else r.nextInt(numPartitions)
      }
    }
  5. Task chaining and resource groups

    預設達成條件後chaining

    • 上下游的並行度一致
    • 下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入)
    • 上下游節點都在同一個 slot group 中(下面會解釋 slot group)
    • 下游節點的 chain 策略為 ALWAYS(可以與上下游連結,map、flatmap、filter等預設是ALWAYS)
    • 上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游連結,不能與上游連結,Source預設是HEAD)
    • 兩個節點間資料分割槽方式是 forward(參考理解資料流的分割槽)
    • 使用者沒有禁用 chain
    // The two mappers will be chained, and filter will not be chained to the first mapper.
    someStream.filter(...).map(...).startNewChain().map(...)
    
    // Do not chain the map operator
    someStream.map(...).disableChaining()
    
    // 預設情況下,如果所有的source operator都共享一個slot,那麼後續的operator都會共享一個slot。為了避免不合理的共享,可以通過下面設定強制指定filter的共享組為“name”
    someStream.filter(...).slotSharingGroup("name")

Task slot是一個TaskManager內資源分配的最小載體,代表了一個固定大小的資源子集,每個TaskManager會將其所佔有的資源平分給它的slot。
通過調整 task slot 的數量,使用者可以定義task之間是如何相互隔離的。每個 TaskManager 有一個slot,也就意味著每個task執行在獨立的 JVM 中。每個 TaskManager 有多個slot的話,也就是說多個task執行在同一個JVM中。
而在同一個JVM程序中的task,即多個slot可以共享TCP連線(基於多路複用)和心跳訊息,可以減少資料的網路傳輸,也能共享一些資料結構,一定程度上減少了每個task的消耗。
每個slot可以接受單個task,也可以接受多個連續task組成的pipeline,即task chain。除此之外,還可以利用上面提到的SlotSharingGroup來吧非chain的task放在同一個slot。這樣不同task之間就不需要換執行緒了,也不需要重新計算總task數,直接保持並行度即可。

Defining UDFs

Flink用Java預設的序列化方式對所有UDFs及其接收的引數進行序列化,併發送到worker程序。

rich function比一般lambda多了一些方法:open()用於初始化運算元,會被每個task執行該rich function前呼叫。其Configuration引數可忽略(DataSet API用)。還有close()、getRuntimeContext()和setRuntimeContext

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
  var subTaskIndex = 0

  override def open(configuration: Configuration): Unit = {
    subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
    // do some initialization
    // e.g. establish a connection to an external system
  }

  override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
    // subtasks are 0-indexed
    if(in % 2 == subTaskIndex) {
      out.collect((subTaskIndex, in))
    }
    // do some more processing
  }

  override def close(): Unit = {
    // do some cleanup, e.g. close connections to external systems
  }
}

利用rich function呼叫global configuration

def main(args: Array[String]) : Unit = {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  
  // flink的類,一個HashMap
  val conf = new Configuration()

  // set the parameter “keyWord” to “flink”
  conf.setString("keyWord", "flink")

  // set the configuration as global
  env.getConfig.setGlobalJobParameters(conf)

  val input: DataStream[String] = env.fromElements(
   "I love flink", "bananas", "apples", "flinky")

  input.filter(new MyFilterFunction)
    .print()

  env.execute()
}

class MyFilterFunction extends RichFilterFunction[String] {
  var keyWord = ""

  override def open(configuration: Configuration): Unit = {
    val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters

    val globConf = globalParams.asInstanceOf[Configuration]
    // null 為預設值
    keyWord = globConf.getString("keyWord", null)
  }

  override def filter(value: String): Boolean = {
    // use the keyWord parameter to filter out elements
    value.contains(keyWord)
  }
}

補充:

parallelism:可以在evn設定預設,在每個operator覆蓋預設

Referencing:_.birthday._表示birthday成員變數的的全部成員變數

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Flink還會減少自己的內部的第三方依賴,如transitive dependencies。使用第三方包時,要麼打包所有依賴,要麼把依賴放到Flink的lib目錄,這樣Flink每次啟動都會載入這些依賴(打包時會忽略,只要叢集的Flink有就行)。

參考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske