1. 程式人生 > >Spark Streaming 和 Flink 誰是資料開發者的最愛

Spark Streaming 和 Flink 誰是資料開發者的最愛

本文從程式設計模型、任務排程、時間機制、Kafka 動態分割槽的感知、容錯及處理語義、背壓等幾個方面對比 Spark Streaming 與 Flink,希望對有實時處理需求業務的企業端使用者在框架選型有所啟發。

程式設計模型對比

執行角色

Spark Streaming 執行時的角色(standalone 模式)主要有:

Master:主要負責整體叢集資源的管理和應用程式排程;Worker:負責單個節點的資源管理,driver 和 executor 的啟動等;Driver:使用者入口程式執行的地方,即 SparkContext 執行的地方,主要是 DAG 生成、stage 劃分、task 生成及排程;Executor:負責執行 task,反饋執行狀態和執行結果。Flink 執行時的角色(standalone 模式)主要有:

Jobmanager:協調分散式執行,他們排程任務、協調 checkpoints、協調故障恢復等。至少有一個 JobManager。高可用情況下可以啟動多個 JobManager,其中一個選舉為 leader,其餘為 standby;Taskmanager: 負責執行具體的 tasks、快取、交換資料流,至少有一個 TaskManager;Slot:每個 task slot 代表 TaskManager 的一個固定部分資源,Slot 的個數代表著 taskmanager 可並行執行的 task 數。生態

(圖 1:Spark Streaming 生態,via Spark 官網)

(圖 2:Flink 生態,via Flink官網)

執行模型

Spark Streaming 是微批處理,執行的時候需要指定批處理的時間,每次執行 job 時處理一個批次的資料,流程如圖 3 所示:

(圖 3:via Spark 官網)

Flink 是基於事件驅動的,事件可以理解為訊息。事件驅動的應用程式是一種狀態應用程式,它會從一個或者多個流中注入事件,通過觸發計算更新狀態,或外部動作對注入的事件作出反應。

(圖 4:via Fink 官網)

 

程式設計模型對比

 

程式設計模型對比,主要是對比 Flink 和 Spark Streaming 兩者在程式碼編寫上的區別。

Spark Streaming

Spark Streaming 與 Kafka 的結合主要是兩種模型:

基於 receiver dstream;基於 direct dstream。以上兩種模型程式設計機構近似,只是在 API 和內部資料獲取有些區別,新版本的已經取消了基於 receiver 這種模式,企業中通常採用基於 direct Dstream 的模式。

val Array(brokers, topics) = args// 建立一個批處理時間是2s的context val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // 使用broker和topic建立DirectStream val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Get the lines, split them into words, count the words and print val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print() // 啟動流 ssc.start() ssc.awaitTermination()

 

通過以上程式碼我們可以 Get 到:

設定批處理時間;建立資料流;編寫 transform;編寫 action;啟動執行。Flink

接下來看 Flink 與 Kafka 結合是如何編寫程式碼的。Flink 與 Kafka 結合是事件驅動,大家可能對此會有疑問,消費 Kafka 的資料呼叫 Poll 的時候是批量獲取資料的(可以設定批處理大小和超時時間),這就不能叫做事件觸發了。

而實際上,Flink 內部對 Poll 出來的資料進行了整理,然後逐條 emit,形成了事件觸發的機制。

下面的程式碼是 Flink 整合 Kafka 作為 data source 和 data sink:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().disableSysoutLogging();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interfaceenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // ExecutionConfig.GlobalJobParametersenv.getConfig().setGlobalJobParameters(null); DataStream<KafkaEvent> input = env .addSource( new FlinkKafkaConsumer010<>(parameterTool.getRequired("input-topic"), new KafkaEventSchema(),parameterTool.getProperties()) .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).setParallelism(1).rebalance() .keyBy("word").map(new RollingAdditionMapper()).setParallelism(0); input.addSink( new FlinkKafkaProducer010<>( parameterTool.getRequired("output-topic"), new KafkaEventSchema(), parameterTool.getProperties())); env.execute("Kafka 0.10 Example");

 

從 Flink 與 Kafka 結合的程式碼可以 Get 到:

註冊資料 source;編寫執行邏輯;註冊資料 sink。呼叫 env.execute 相比於 Spark Streaming 少了設定批處理時間,還有一個顯著的區別是 flink 的所有運算元都是 lazy 形式的,呼叫 env.execute 會構建 jobgraph。client 端負責 Jobgraph 生成並提交它到叢集執行;而 Spark Streaming的操作運算元分 action 和 transform,其中僅有 transform 是 lazy 形式,而且 DAG 生成、stage 劃分、任務排程是在 driver 端進行的,在 client 模式下 driver 運行於客戶端處。

任務排程原理

 

Spark 任務排程

Spark Streaming 任務如上文提到的是基於微批處理的,實際上每個批次都是一個 Spark Core 的任務。對於編碼完成的 Spark Core 任務在生成到最終執行結束主要包括以下幾個部分:

構建 DAG 圖;劃分 stage;生成 taskset;排程 task。具體可參考圖 5:

(圖 5:Spark 任務排程)

對於 job 的排程執行有 fifo 和 fair 兩種模式,Task 是根據資料本地性排程執行的。

假設每個 Spark Streaming 任務消費的 Kafka topic 有四個分割槽,中間有一個 transform操作(如 map)和一個 reduce 操作,如圖 6 所示:

(圖 6 )

假設有兩個 executor,其中每個 executor 三個核,那麼每個批次相應的 task 執行位置是固定的嗎?是否能預測?

由於資料本地性和排程不確定性,每個批次對應 Kafka 分割槽生成的 task 執行位置並不是固定的。

Flink 任務排程

對於 Flink 的流任務客戶端首先會生成 StreamGraph,接著生成 JobGraph,然後將 jobGraph 提交給 Jobmanager 由它完成 jobGraph 到 ExecutionGraph 的轉變,最後由 jobManager 排程執行。

(圖 7)

如圖 7 所示有一個由 data source、MapFunction和 ReduceFunction 組成的程式,data source 和 MapFunction 的併發度都為 4,而 ReduceFunction 的併發度為 3。

一個數據流由 Source-Map-Reduce 的順序組成,在具有 2 個TaskManager、每個 TaskManager 都有 3 個 Task Slot 的叢集上執行。

可以看出 Flink 的拓撲生成提交執行之後,除非故障,否則拓撲部件執行位置不變,並行度由每一個運算元並行度決定,類似於 Storm。

而 Spark Streaming 是每個批次都會根據資料本地性和資源情況進行排程,無固定的執行拓撲結構。

Flink 是資料在拓撲結構裡流動執行,而 Spark Streaming 則是對資料快取批次並行處理。

 

時間機制對比

 

流處理的時間

流處理程式在時間概念上總共有三個時間概念:

處理時間處理時間是指每臺機器的系統時間,當流程式採用處理時間時將使用執行各個運算子例項的機器時間。處理時間是最簡單的時間概念,不需要流和機器之間的協調,它能提供最好的效能和最低延遲。

然而在分散式和非同步環境中,處理時間不能提供訊息事件的時序性保證,因為它受到訊息傳輸延遲,訊息在運算元之間流動的速度等方面制約。

事件時間事件時間是指事件在其裝置上發生的時間,這個時間在事件進入 Flink 之前已經嵌入事件,然後 Flink 可以提取該時間。基於事件時間進行處理的流程式可以保證事件在處理的時候的順序性,但是基於事件時間的應用程式必須要結合 watermark 機制。

基於事件時間的處理往往有一定的滯後性,因為它需要等待後續事件和處理無序事件,對於時間敏感的應用使用的時候要慎重考慮。

注入時間注入時間是事件注入到 Flink 的時間。事件在 source 運算元處獲取 source 的當前時間作為事件注入時間,後續的基於時間的處理運算元會使用該時間處理資料。

相比於事件時間,注入時間不能夠處理無序事件或者滯後事件,但是應用程式無序指定如何生成 watermark。

在內部注入時間程式的處理和事件時間類似,但是時間戳分配和 watermark 生成都是自動的。

圖 8 可以清晰地看出三種時間的區別:

(圖 8)

Spark 時間機制

Spark Streaming 只支援處理時間,Structured streaming 支援處理時間和事件時間,同時支援 watermark 機制處理滯後資料。

Flink 時間機制

Flink 支援三種時間機制:事件時間、注入時間、處理時間,同時支援 watermark 機制處理滯後資料。

 

Kafka 動態分割槽檢測

 

Spark Streaming

對於有實時處理業務需求的企業,隨著業務增長資料量也會同步增長,將導致原有的 Kafka 分割槽數不滿足資料寫入所需的併發度,需要擴充套件 Kafka 的分割槽或者增加 Kafka 的 topic,這時就要求實時處理程式,如 SparkStreaming、Flink 能檢測到 Kafka 新增的 topic 、分割槽及消費新增分割槽的資料。

接下來結合原始碼分析,Spark Streaming 和 Flink 在 Kafka 新增 topic 或 partition 時能否動態發現新增分割槽並消費處理新增分割槽的資料。

Spark Streaming 與 Kafka 結合有兩個區別比較大的版本,如圖 9 所示是官網給出的對比資料:

(圖 9)

其中確認的是 Spark Streaming 與 Kafka 0.8 版本結合不支援動態分割槽檢測,與 0.10 版本結合支援,接著通過原始碼分析。

Spark Streaming 與 kafka 0.8 版本結合(*原始碼分析只針對分割槽檢測)

入口是 DirectKafkaInputDStream 的 compute:

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {// 改行程式碼會計算這個job,要消費的每個kafka分割槽的最大偏移val untilOffsets = clamp(latestLeaderOffsets(maxRetries))// 構建KafkaRDD,用指定的分割槽數和要消費的offset範圍val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker.val offsetRanges = currentOffsets.map { case (tp, fo) =>val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) } val description = offsetRanges.filter { offsetRange =>// Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset}.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the userval metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }

 

第一行就是計算得到該批次生成 KafkaRDD 每個分割槽要消費的最大 offset。 接著看 latestLeaderOffsets(maxRetries)。

@tailrecprotectedfinal def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {// 可以看到的是用來指定獲取最大偏移分割槽的列表還是隻有currentOffsets,沒有發現關於新增的分割槽的內容。val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manuallyif (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err)} else {logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs)latestLeaderOffsets(retries - 1) } } else { o.right.get } }

 

其中 protected var currentOffsets = fromOffsets,這個僅僅是在構建 DirectKafkaInputDStream 的時候初始化,並在 compute 裡面更新:

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

 

中間沒有檢測 Kafka 新增 topic 或者分割槽的程式碼,所以可以確認 Spark Streaming 與 kafka 0.8 的版本結合不支援動態分割槽檢測。

Spark Streaming 與 Kafka 0.10 版本結合

入口同樣是 DirectKafkaInputDStream 的 compute 方法,撿主要的部分說,Compute 裡第一行也是計算當前 job 生成 kafkardd 要消費的每個分割槽的最大 offset:

// 獲取當前生成job,要用到的KafkaRDD每個分割槽最大消費偏移值val untilOffsets = clamp(latestOffsets())

 

具體檢測 Kafka 新增 topic 或者分割槽的程式碼在 latestOffsets()

/** * Returns the latest (highest) available offsets, taking new partitions into account. */protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer paranoidPoll(c) // 獲取所有的分割槽資訊 val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets// 做差獲取新增的分割槽資訊 val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit// 新分割槽消費位置,沒有記錄的化是由auto.offset.reset決定 currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap }

 

該方法內有獲取 Kafka 新增分割槽,並將其更新到 currentOffsets 的過程,所以可以驗證 Spark Streaming 與 Kafka 0.10 版本結合支援動態分割槽檢測。

Flink

入口類是 FlinkKafkaConsumerBase,該類是所有 Flink 的 Kafka 消費者的父類。

(圖 10)

在 FlinkKafkaConsumerBase 的 run 方法中,建立了 kafkaFetcher,實際上就是消費者:

this.kafkaFetcher = createFetcher(sourceContext,subscribedPartitionsToStartOffsets, periodicWatermarkAssigner,punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode,getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),useMetrics);

 

接是建立了一個執行緒,該執行緒會定期檢測 Kafka 新增分割槽,然後將其新增到 kafkaFetcher 裡。

if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(new Runnable() { @Overridepublicvoidrun(){ try { // --------------------- partition discovery loop ---------------------List<KafkaTopicPartition> discoveredPartitions; // throughout the loop, we always eagerly check if we are still running before// performing the next operation, so that we can escape the loop as soon as possiblewhile (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());} try { discoveredPartitions = partitionDiscoverer.discoverPartitions(); } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery;// this would only happen if the consumer was canceled; simply escape the loopbreak; } // no need to add the discovered partitions if we were closed during the meantimeif (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); } // do not waste any time sleeping if we're not running anymoreif (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis); } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loopbreak; } } } } catch (Exception e) {discoveryLoopErrorRef.set(e); } finally { // calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) { cancel(); } } } }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start(); kafkaFetcher.runFetchLoop();

 

上面,就是 Flink 動態發現 Kafka 新增分割槽的過程。不過與 Spark 無需做任何配置不同的是,Flink 動態發現 Kafka 新增分割槽,這個功能需要被使能的。

也很簡單,需要將 flink.partition-discovery.interval-millis 該屬性設定為大於 0 即可。

 

容錯機制及處理語義

 

本節內容主要是想對比兩者在故障恢復及如何保證僅一次的處理語義。這個時候適合丟擲一個問題:實時處理的時候,如何保證資料僅一次處理語義?

Spark Streaming 保證僅一次處理

對於 Spark Streaming 任務,我們可以設定 checkpoint,然後假如發生故障並重啟,我們可以從上次 checkpoint 之處恢復,但是這個行為只能使得資料不丟失,可能會重複處理,不能做到恰一次處理語義。

對於 Spark Streaming 與 Kafka 結合的 direct Stream 可以自己維護 offset 到 Zookeeper、Kafka 或任何其它外部系統,每次提交完結果之後再提交 offset,這樣故障恢復重啟可以利用上次提交的 offset 恢復,保證資料不丟失。

但是假如故障發生在提交結果之後、提交 offset 之前會導致資料多次處理,這個時候我們需要保證處理結果多次輸出不影響正常的業務。

由此可以分析,假設要保證資料恰一次處理語義,那麼結果輸出和 offset 提交必須在一個事務內完成。在這裡有以下兩種做法:

repartition(1) Spark Streaming 輸出的 action 變成僅一個 partition,這樣可以利用事務去做:Dstream.foreachRDD(rdd=>{ rdd.repartition(1).foreachPartition(partition=>{ // 開啟事務 partition.foreach(each=>{// 提交資料 }) // 提交事務 }) })

 

將結果和 offset 一起提交。也就是結果資料包含 offset。這樣提交結果和提交 offset 就是一個操作完成,不會資料丟失,也不會重複處理。故障恢復的時候可以利用上次提交結果帶的 offset。

Flink 與 kafka 0.11 保證僅一次處理

若要 sink 支援僅一次語義,必須以事務的方式寫資料到 Kafka,這樣當提交事務時兩次 checkpoint 間的所有寫入操作作為一個事務被提交。這確保了出現故障或崩潰時這些寫入操作能夠被回滾。

在一個分散式且含有多個併發執行 sink 的應用中,僅僅執行單次提交或回滾是不夠的,因為所有元件都必須對這些提交或回滾達成共識,這樣才能保證得到一致性的結果。

Flink 使用兩階段提交協議以及預提交(pre-commit)階段來解決這個問題。

本例中的 Flink 應用如圖 11 所示包含以下元件:

一個source,從Kafka中讀取資料(即KafkaConsumer);一個時間視窗化的聚會操作;一個sink,將結果寫回到Kafka(即KafkaProducer)。

(圖 11)

下面詳細講解 Flink 的兩段提交思路:

(圖 12)

如圖 12 所示,Flink checkpointing 開始時便進入到 pre-commit 階段。

具體來說,一旦 checkpoint 開始,Flink 的 JobManager 向輸入流中寫入一個 checkpoint barrier ,將流中所有訊息分割成屬於本次 checkpoint 的訊息以及屬於下次 checkpoint 的,barrier 也會在操作運算元間流轉。

對於每個 operator 來說,該 barrier 會觸發 operator 狀態後端為該 operator 狀態打快照。

data source 儲存了 Kafka 的 offset,之後把 checkpoint barrier 傳遞到後續的 operator。

這種方式僅適用於 operator 僅有它的內部狀態。內部狀態是指 Flink state backends 儲存和管理的內容(如第二個 operator 中 window 聚合算出來的 sum)。

當一個程序僅有它的內部狀態的時候,除了在 checkpoint 之前將需要將資料更改寫入到 state backend,不需要在預提交階段做其他的動作。

在 checkpoint 成功的時候,Flink 會正確的提交這些寫入,在 checkpoint 失敗的時候會終止提交,過程可見圖 13。

(圖 13)

當結合外部系統的時候,外部系統必須要支援可與兩階段提交協議捆綁使用的事務。

顯然本例中的 sink 由於引入了 kafka sink,因此在預提交階段 data sink 必須預提交外部事務。如下圖:

(圖 14)

當 barrier 在所有的運算元中傳遞一遍,並且觸發的快照寫入完成,預提交階段完成。

所有的觸發狀態快照都被視為 checkpoint 的一部分,也可以說 checkpoint 是整個應用程式的狀態快照,包括預提交外部狀態。出現故障可以從 checkpoint 恢復。

下一步就是通知所有的操作運算元 checkpoint 成功。該階段 jobmanager 會為每個 operator 發起 checkpoint 已完成的回撥邏輯。

本例中 data source 和視窗操作無外部狀態,因此該階段,這兩個運算元無需執行任何邏輯,但是 data sink 是有外部狀態的,因此,此時我們必須提交外部事務,如下圖:

(圖 15)

以上就是 Flink 實現恰一次處理的基本邏輯。

 

Back pressure

 

消費者消費的速度低於生產者生產的速度,為了使應用正常,消費者會反饋給生產者來調節生產者生產的速度,以使得消費者需要多少,生產者生產多少。(*back pressure 後面一律稱為背壓。)

Spark Streaming 的背壓

Spark Streaming 跟 Kafka 結合是存在背壓機制的,目標是根據當前 job 的處理情況來調節後續批次的獲取 Kafka 訊息的條數。

為了達到這個目的,Spark Streaming 在原有的架構上加入了一個 RateController,利用的演算法是 PID,需要的反饋資料是任務處理的結束時間、排程時間、處理時間、訊息條數。

這些資料是通過 SparkListener 體系獲得,然後通過 P 的 compute 計算得到一個速率,進而可以計算得到一個 offset,然後跟限速設定最大消費條數比較得到一個最終要消費的訊息最大 offset。

P 的 compute 方法如下:

def compute( time: Long, // in millisecondsnumElements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long// in milliseconds ): Option[Double] = { logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000val processingRate = numElements.toDouble / processingDelay * 1000val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2)val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate)logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin) latestTime = time if (firstRun) { latestRate = processingRate latestError = 0D firstRun = falselogTrace("First run, rate estimation skipped") None } else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } } else { logTrace("Rate estimation skipped") None } } }

 

Flink 的背壓

與 Spark Streaming 的背壓不同的是,Flink 背壓是 jobmanager 針對每一個 task 每 50ms 觸發 100 次 Thread.getStackTrace() 呼叫,求出阻塞的佔比。過程如圖 16 所示:

(圖 16)

阻塞佔比在 web 上劃分了三個等級:

OK: 0 <= Ratio <= 0.10,表示狀態良好;LOW: 0.10 < Ratio <= 0.5,表示有待觀察;HIGH: 0.5 < Ratio <= 1,表示要處理了。