1. 程式人生 > >整合Kafka到Spark Streaming——程式碼示例和挑戰

整合Kafka到Spark Streaming——程式碼示例和挑戰

作者Michael G. Noll是瑞士的一位工程師和研究員,效力於Verisign,是Verisign實驗室的大規模資料分析基礎設施(基礎Hadoop)的技術主管。本文,Michael詳細的演示瞭如何將Kafka整合到Spark Streaming中。 期間, Michael還提到了將Kafka整合到 Spark Streaming中的一些現狀,非常值得閱讀,雖然有一些資訊在Spark 1.2版本中已發生了一些變化,比如HA策略: 通過Spark Contributor、Spark佈道者陳超我們瞭解到 ,在Spark 1.2版本中,Spark Streaming開始支援fully HA模式(選擇使用),通過新增一層WAL(Write Ahead Log),每次收到資料後都會存在HDFS上,從而避免了以前版本中的資料丟失情況,但是不可避免的造成了一定的開銷,需要開發者自行衡量。

以下為譯文

作為一個實時大資料處理工具, Spark Sreaming 近日一直被廣泛關注,與 Apache Storm 的對比也經常出現。但是依我說,缺少與Kafka整合,任何實時大資料處理工具都是不完整的,因此我將一個示例Spark Streaming應用程式新增到 kafka-storm-starter ,並且示範如何從Kafka讀取,以及如何寫入到Kafka。在這個過程中,我還使用Avro作為資料格式,以及Twitter Bijection進行資料序列化。

在本篇文章,我將詳細地講解這個Spark Streaming示例;同時,我還會穿插當下Spark Streaming與Kafka整合的一些焦點話題。免責宣告:這是我首次試驗Spark Streaming,僅作為參考。

當下,這個Spark Streaming示例被上傳到GitHub,下載訪問: kafka-storm-starter。專案的名稱或許會讓你產生某些誤解,不過,不要在意這些細節:)

什麼是Spark Streaming

Spark Streaming 是Apache Spark的一個子專案。Spark是個類似於Apache Hadoop的開源批處理平臺,而Spark Streaming則是個實時處理工具,執行在Spark引擎之上。

Spark Streaming vs. Apache Storm

Spark Streaming與Apache Storm有一些相似之處,後者是當下最流行的大資料處理平臺。前不久,雅虎的Bobby Evans 和Tom Graves曾發表過一個“ 

Spark and Storm at Yahoo! ”的演講,在這個演講中,他們對比了兩個大平臺,並提供了一些選擇參考。類似的,Hortonworks的P. Taylor Goetz也分享過名為 Apache Storm and Spark Streaming Compared 的講義。

這裡,我也提供了一個非常簡短的對比:對比Spark Streaming,Storm的產業採用更高,生產環境應用也更穩定。但是從另一方面來說,對比Storm,Spark擁有更清晰、等級更高的API,因此Spark使用起來也更加愉快,最起碼是在使用Scala編寫Spark應用程式的情況(毫無疑問,我更喜歡Spark中的API)。但是,請別這麼直接的相信我的話,多看看上面的演講和講義。

不管是Spark還是Storm,它們都是Apache的頂級專案,當下許多大資料平臺提供商也已經開始整合這兩個框架(或者其中一個)到其商業產品中,比如Hortonworks就同時整合了Spark和Storm,而Cloudera也整合了Spark。

附錄:Spark中的Machines、cores、executors、tasks和receivers 

本文的後續部分將講述許多Spark和Kafka中的parallelism問題,因此,你需要掌握一些Spark中的術語以弄懂這些環節。

  • 一個Spark叢集必然包含了1個以上的工者作節點,又稱為從主機(為了簡化架構,這裡我們先拋棄開叢集管理者不談)。
  • 一個工作者節點可以執行一個以上的executor
  • Executor是一個用於應用程式或者工作者節點的程序,它們負責處理tasks,並將資料儲存到記憶體或者磁碟中。每個應用程式都有屬於自己的executors,一個executor則包含了一定數量的cores(也被稱為slots)來執行分配給它的任務。
  • Task是一個工作單元,它將被傳送給executor。也就是說,task將是你應用程式的計算內容(或者是一部分)。SparkContext將把這些tasks傳送到executors進行執行。每個task都會佔用父executor中的一個core(slot)。
  • Receiver( API , 文件 )將作為一個長期執行的task跑在一個executor上。每個receiver都會負責一個所謂的input DStream(比如從Kafka中讀取的一個輸入流),同時每個receiver( input DStream)佔用一個core/slot。
  • input DStream:input DStream是DStream的一個型別,它負責將Spark Streaming連線到外部的資料來源,用於讀取資料。對於每個外部資料來源(比如Kafka)你都需要配置一個input DStream。一個Spark Streaming會通過一個input DStream與一個外部資料來源進行連線,任何後續的DStream都會建立標準的DStreams。

在Spark的執行模型,每個應用程式都會獲得自己的executors,它們會支撐應用程式的整個流程,並以多執行緒的方式執行1個以上的tasks,這種隔離途徑非常類似Storm的執行模型。一旦引入類似YARN或者Mesos這樣的叢集管理器,整個架構將會變得異常複雜,因此這裡將不會引入。你可以通過Spark文件中的 Cluster Overview 瞭解更多細節。

整合Kafka到Spark Streaming

概述

簡而言之,Spark是支援Kafka的,但是這裡存在許多不完善的地方。

Spark程式碼庫中的 KafkaWordCount 對於我們來說是個非常好的起點,但是這裡仍然存在一些開放式問題。

特別是我想了解如何去做:

  • 從kafaka中並行讀入。在Kafka,一個話題(topic)可以有N個分割槽。理想的情況下,我們希望在多個分割槽上並行讀取。這也是 Kafka spout in Storm 的工作。
  • 從一個Spark Streaming應用程式向Kafka寫入,同樣,我們需要並行執行。

在完成這些操作時,我同樣碰到了Spark Streaming和/或Kafka中一些已知的問題,這些問題大部分都已經在Spark mailing list中列出。在下面,我將詳細總結Kafka整合到Spark的現狀以及一些常見問題。

Kafka中的話題、分割槽(partitions)和parallelism

Kafka將資料儲存在話題中,每個話題都包含了一些可配置數量的分割槽。話題的分割槽數量對於效能來說非常重要,而這個值一般是消費者parallelism的最大數量:如果一個話題擁有N個分割槽,那麼你的應用程式最大程度上只能進行N個執行緒的並行,最起碼在使用Kafka內建Scala/Java消費者API時是這樣的。

與其說應用程式,不如說Kafka術語中的消費者群(consumer group)。消費者群,通過你選擇的字串識別,它是邏輯消費者應用程式叢集範圍的識別符。同一個消費者群中的所有消費者將分擔從一個指定Kafka話題中的讀取任務,同時,同一個消費組中所有消費者從話題中讀取的執行緒數最大值即是N(等同於分割槽的數量),多餘的執行緒將會閒置。

多個不同的Kafka消費者群可以並行的執行:毫無疑問,對同一個Kafka話題,你可以執行多個獨立的邏輯消費者應用程式。這裡,每個邏輯應用程式都會執行自己的消費者執行緒,使用一個唯一的消費者群id。而每個應用程式通常可以使用不同的read parallelisms(見下文)。當在下文我描述不同的方式配置read parallelisms時,我指的是如何完成這些邏輯消費者應用程式中的一個設定。

這裡有一些簡單的例子

  • 你的應用程式使用“terran”消費者群id對一個名為“zerg.hydra”的kafka話題進行讀取,這個話題擁有10個分割槽。如果你的消費者應用程式只配置一個執行緒對這個話題進行讀取,那麼這個執行緒將從10個分割槽中進行讀取。
  • 同上,但是這次你會配置5個執行緒,那麼每個執行緒都會從2個分割槽中進行讀取。
  • 同上,這次你會配置10個執行緒,那麼每個執行緒都會負責1個分割槽的讀取。
  • 同上,但是這次你會配置多達14個執行緒。那麼這14個執行緒中的10個將平分10個分割槽的讀取工作,剩下的4個將會被閒置。

這裡我們不妨看一下現實應用中的複雜性——Kafka中的再平衡事件。在Kafka中,再平衡是個生命週期事件(lifecycle event),在消費者加入或者離開消費者群時都會觸發再平衡事件。這裡我們不會進行詳述,更多再平衡詳情可參見我的 Kafka training deck 一文。

你的應用程式使用消費者群id“terran”,並且從1個執行緒開始,這個執行緒將從10個分割槽中進行讀取。在執行時,你逐漸將執行緒從1個提升到14個。也就是說,在同一個消費者群中,parallelism突然發生了變化。毫無疑問,這將造成Kafka中的再平衡。一旦在平衡結束,你的14個執行緒中將有10個執行緒平分10個分割槽的讀取工作,剩餘的4個將會被閒置。因此如你想象的一樣,初始執行緒以後只會讀取一個分割槽中的內容,將不會再讀取其他分割槽中的資料。

現在,我們終於對話題、分割槽有了一定的理解,而分割槽的數量將作為從Kafka讀取時parallelism的上限。但是對於一個應用程式來說,這種機制會產生一個什麼樣的影響,比如一個Spark Streaming job或者 Storm topology從Kafka中讀取資料作為輸入。

1. Read parallelism: 通常情況下,你期望使用N個執行緒並行讀取Kafka話題中的N個分割槽。同時,鑑於資料的體積,你期望這些執行緒跨不同的NIC,也就是跨不同的主機。在Storm中,這可以通過TopologyBuilder#setSpout()設定Kafka spout的parallelism為N來實現。在Spark中,你則需要做更多的事情,在下文我將詳述如何實現這一點。

2. Downstream processing parallelism: 一旦使用Kafka,你希望對資料進行並行處理。鑑於你的用例,這種等級的parallelism必然與read parallelism有所區別。如果你的用例是計算密集型的,舉個例子,對比讀取執行緒,你期望擁有更多的處理執行緒;這可以通過從多個讀取執行緒shuffling或者網路“fanning out”資料到處理執行緒實現。因此,你通過增長網路通訊、序列化開銷等將訪問交付給更多的cores。在Storm中,你通過shuffle grouping 將Kafka spout shuffling到下游的bolt中。在Spark中,你需要通過DStreams上的 repartition 轉換來實現。

通常情況下,大家都渴望去耦從Kafka的parallelisms讀取,並立即處理讀取來的資料。在下一節,我將詳述使用 Spark Streaming從Kafka中的讀取和寫入。

從Kafka中讀取

Spark Streaming中的Read parallelism

類似Kafka,Read parallelism中也有分割槽的概念。瞭解Kafka的per-topic話題與RDDs in Spark 中的分割槽沒有關聯非常重要。

Spark Streaming中的 KafkaInputDStream (又稱為Kafka聯結器)使用了Kafka的高等級消費者API ,這意味著在Spark中為Kafka設定 read parallelism將擁有兩個控制按鈕。

1. Input DStreams的數量。 因為Spark在每個Input DStreams都會執行一個receiver(=task),這就意味著使用多個input DStreams將跨多個節點並行進行讀取操作,因此,這裡寄希望於多主機和NICs。

2. Input DStreams上的消費者執行緒數量。 這裡,相同的receiver(=task)將執行多個讀取執行緒。這也就是說,讀取操作在每個core/machine/NIC上將並行的進行。

在實際情況中,第一個選擇顯然更是大家期望的。

為什麼會這樣?首先以及最重要的,從Kafka中讀取通常情況下會受到網路/NIC限制,也就是說,在同一個主機上你執行多個執行緒不會增加讀的吞吐量。另一方面來講,雖然不經常,但是有時候從Kafka中讀取也會遭遇CPU瓶頸。其次,如果你選擇第二個選項,多個讀取執行緒在將資料推送到blocks時會出現鎖競爭(在block生產者例項上,BlockGenerator的“+=”方法真正使用的是“synchronized”方式)。

input DStreams建立的RDDs分割槽數量:KafkaInputDStream將儲存從Kafka中讀取的每個資訊到Blocks。從我的理解上,一個新的Block由 spark.streaming.blockInterval在毫秒級別建立,而每個block都會轉換成RDD的一個分割槽,最終由DStream建立。如果我的這種假設成立,那麼由KafkaInputDStream建立的RDDs分割槽數量由batchInterval / spark.streaming.blockInterval決定,而batchInterval則是資料流拆分成batches的時間間隔,它可以通過StreamingContext的一個建構函式引數設定。舉個例子,如果你的批時間價格是2秒(預設情況下),而block的時間間隔是200毫秒(預設情況),那麼你的RDD將包含10個分割槽。如果有錯誤的話,可以提醒我。

選項1:控制input DStreams的數量

val ssc:StreamingContext=???// ignore for now
val kafkaParams:Map[String,String]=Map("group.id"->"terran",/* ignore rest */)

val numInputDStreams =5
val kafkaDStreams =(1 to numInputDStreams).map {_=>KafkaUtils.createStream(...)}

在這個例子中,我們建立了5個input DStreams,因此從Kafka中讀取的工作將分擔到5個核心上,寄希望於5個主機/NICs(之所以說是寄希望於,因為我也不確定Spark Streaming task佈局策略是否會將receivers投放到多個主機上)。所有Input Streams都是“terran”消費者群的一部分,而Kafka將保證topic的所有資料可以同時對這5個input DSreams可用。換句話說,這種“collaborating”input DStreams設定可以工作是基於消費者群的行為是由Kafka API提供,通過KafkaInputDStream完成。

在這個例子中,我沒有提到每個input DSream會建立多少個執行緒。在這裡,執行緒的數量可以通過KafkaUtils.createStream方法的引數設定(同時,input topic的數量也可以通過這個方法的引數指定)。在下一節中,我們將通過實際操作展示。

但是在開始之前,在這個步驟我先解釋幾個Spark Streaming中常見的幾個問題,其中有些因為當下Spark中存在的一些限制引起,另一方面則是由於當下Kafka input DSreams的一些設定造成:

當你使用我上文介紹的多輸入流途徑,而這些消費者都是屬於同一個消費者群,它們會給消費者指定負責的分割槽。這樣一來則可能導致syncpartitionrebalance的失敗,系統中真正工作的消費者可能只會有幾個。為了解決這個問題,你可以把再均衡嘗試設定的非常高,從而獲得它的幫助。然後,你將會碰到另一個坑——如果你的receiver宕機(OOM,亦或是硬體故障),你將停止從Kafka接收訊息。

這裡,我們需要對“停止從Kafka中接收”問題 做一些解釋 。當下,當你通過ssc.start()開啟你的streams應用程式後,處理會開始並一直進行,即使是輸入資料來源(比如Kafka)變得不可用。也就是說,流不能檢測出是否與上游資料來源失去連結,因此也不會對丟失做出任何反應,舉個例子來說也就是重連或者結束執行。類似的,如果你丟失這個資料來源的一個receiver,那麼 你的流應用程式可能就會生成一些空的RDDs 。

這是一個非常糟糕的情況。最簡單也是最粗糙的方法就是,在與上游資料來源斷開連線或者一個receiver失敗時,重啟你的流應用程式。但是,這種解決方案可能並不會產生實際效果,即使你的應用程式需要將Kafka配置選項auto.offset.reset設定到最小——因為Spark Streaming中一些已知的bug,可能導致你的流應用程式發生一些你意想不到的問題,在下文Spark Streaming中常見問題一節我們將詳細的進行介紹。

選擇2:控制每個input DStream上小發著執行緒的數量

在這個例子中,我們將建立一個單一的input DStream,它將執行3個消費者執行緒——在同一個receiver/task,因此是在同一個core/machine/NIC上對Kafka topic “zerg.hydra”進行讀取。

val ssc:StreamingContext=???// ignore for now
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)

val consumerThreadsPerInputDstream =3
val topics =Map("zerg.hydra"-> consumerThreadsPerInputDstream)
val stream =KafkaUtils.createStream(ssc, kafkaParams, topics,...)

KafkaUtils.createStream方法被過載,因此這裡有一些不同方法的特徵。在這裡,我們會選擇Scala派生以獲得最佳的控制。

結合選項1和選項2

下面是一個更完整的示例,結合了上述兩種技術:

val ssc:StreamingContext=???
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)

val numDStreams =5
val topics =Map("zerg.hydra"->1)
val kafkaDStreams =(1 to numDStreams).map{_ =>KafkaUtils.createStream(ssc, kafkaParams, topics,...)}

我們建立了5個input DStreams,它們每個都會執行一個消費者執行緒。如果“zerg.hydra”topic擁有5個分割槽(或者更少),那麼這將是進行並行讀取的最佳途徑,如果你在意系統最大吞吐量的話。

Spark Streaming中的並行Downstream處理

在之前的章節中,我們覆蓋了從Kafka的並行化讀取,那麼我們就可以在Spark中進行並行化處理。那麼這裡,你必須弄清楚Spark本身是如何進行並行化處理的。類似Kafka,Spark將parallelism設定的與(RDD)分割槽數量有關, 通過在每個RDD分割槽上執行task進行 。在有些文件中,分割槽仍然被稱為“slices”。

在任何Spark應用程式中,一旦某個Spark Streaming應用程式接收到輸入資料,其他處理都與非streaming應用程式相同。也就是說,與普通的Spark資料流應用程式一樣,在Spark Streaming應用程式中,你將使用相同的工具和模式。更多詳情可見Level of Parallelism in Data Processing 文件。

因此,我們同樣將獲得兩個控制手段:

1. input DStreams的數量 ,也就是說,我們在之前章節中read parallelism的數量作為結果。這是我們的立足點,這樣一來,我們在下一個步驟中既可以保持原樣,也可以進行修改。

2. DStream轉化的重分配 。這裡將獲得一個全新的DStream,其parallelism等級可能增加、減少,或者保持原樣。在DStream中每個返回的RDD都有指定的N個分割槽。DStream由一系列的RDD組成,DStream.repartition則是通過RDD.repartition實現。接下來將對RDD中的所有資料做隨機的reshuffles,然後建立或多或少的分割槽,並進行平衡。同時,資料會在所有網路中進行shuffles。換句話說,DStream.repartition非常類似Storm中的shuffle grouping。

因此,repartition是從processing parallelism解耦read parallelism的主要途徑。在這裡,我們可以設定processing tasks的數量,也就是說設定處理過程中所有core的數量。間接上,我們同樣設定了投入machines/NICs的數量。

一個DStream轉換相關是 union 。這個方法同樣在StreamingContext中,它將從多個DStream中返回一個統一的DStream,它將擁有相同的型別和滑動時間。通常情況下,你更願意用StreamingContext的派生。一個union將返回一個由Union RDD支撐的UnionDStream。Union RDD由RDDs統一後的所有分割槽組成,也就是說,如果10個分割槽都聯合了3個RDDs,那麼你的聯合RDD例項將包含30個分割槽。換句話說,union會將多個 DStreams壓縮到一個 DStreams或者RDD中,但是需要注意的是,這裡的parallelism並不會發生改變。你是否使用union依賴於你的用例是否需要從所有Kafka分割槽進行“in one place”資訊獲取決定,因此這裡大部分都是基於語義需求決定。舉個例子,當你需要執行一個不用元素上的(全域性)計數。

注意: RDDs是無序的。因此,當你union RDDs時,那麼結果RDD同樣不會擁有一個很好的序列。如果你需要在RDD中進行sort。

你的用例將決定需要使用的方法,以及你需要使用哪個。如果你的用例是CPU密集型的,你希望對zerg.hydra topic進行5 read parallelism讀取。也就是說,每個消費者程序使用5個receiver,但是卻可以將processing parallelism提升到20。

val ssc:StreamingContext=???
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)
val readParallelism =5
val topics =Map("zerg.hydra"->1)
val kafkaDStreams =(1 to readParallelism).map{ _ =>KafkaUtils.createStream(ssc, kafkaParams, topics,...)}//> collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream = ssc.union(kafkaDStreams)// often unnecessary, just showcasing how to do it//> single DStream
val processingParallelism =20
val processingDStream = unionDStream(processingParallelism)//> single DStream but now with 20 partitions

在下一節中,我將把所有部分結合到一起,並且聯合實際資料處理進行講解。

寫入到Kafka

寫入到Kafka需要從foreachRDD輸出操作進行:

通用的輸出操作者都包含了一個功能(函式),讓每個RDD都由Stream生成。這個函式需要將每個RDD中的資料推送到一個外部系統,比如將RDD儲存到檔案,或者通過網路將它寫入到一個數據庫。需要注意的是,這裡的功能函式將在驅動中執行,同時其中通常會伴隨RDD行為,它將會促使流RDDs的計算。

注意: 重提“功能函式是在驅動中執行”,也就是Kafka生產者將從驅動中進行,也就是說“功能函式是在驅動中進行評估”。當你使用foreachRDD從驅動中讀取Design Patterns時,實際過程將變得更加清晰。

在這裡,建議大家去閱讀Spark文件中的 Design Patterns for using foreachRDD一節,它將詳細講解使用foreachRDD讀外部系統中的一些常用推薦模式,以及經常出現的一些陷阱。

在我們這個例子裡,我們將按照推薦來重用Kafka生產者例項,通過生產者池跨多個RDDs/batches。 我通過 Apache Commons Pool 實現了這樣一個工具,已經上傳到GitHub 。這個生產者池本身通過 broadcast variable 提供給tasks。

最終結果看起來如下:

val producerPool ={// See the full code on GitHub for details on how the pool is created
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)}

stream.map {...}.foreachRDD(rdd =>{
  rdd.foreachPartition(partitionOfRecords =>{// Get a producer from the shared pool
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach{case tweet:Tweet=>// Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)// Send the bytes to Kafka
      p.send(bytes)}// Returning the producer to the pool also shuts it down
    producerPool.value.returnObject(p)})})

需要注意的是, Spark Streaming每分鐘都會建立多個RDDs,每個都會包含多個分割槽,因此你無需為Kafka生產者例項建立新的Kafka生產者,更不用說每個Kafka訊息。上面的步驟將最小化Kafka生產者例項的建立數量,同時也會最小化TCP連線的數量(通常由Kafka叢集確定)。你可以使用這個池設定來精確地控制對流應用程式可用的Kafka生產者例項數量。如果存在疑惑,儘量用更少的。

完整示例

下面的程式碼是示例Spark Streaming應用程式的要旨(所有程式碼參見 這裡 )。這裡,我做一些解釋:

  • 並行地從Kafka topic中讀取Avro-encoded資料。我們使用了一個最佳的read parallelism,每個Kafka分割槽都配置了一個單執行緒 input DStream。
  • 並行化Avro-encoded資料到pojos中,然後將他們並行寫到binary,序列化可以通過Twitter Bijection 執行。
  • 通過Kafka生產者池將結果寫回一個不同的Kafka topic。
// Set up the input DStream to read from Kafka (in parallel)
val kafkaStream ={
  val sparkStreamingConsumerGroup ="spark-streaming-consumer-group"
  val kafkaParams =Map("zookeeper.connect"->"zookeeper1:2181","group.id"->"spark-streaming-test","zookeeper.connection.timeout.ms"->"1000")
  val inputTopic ="input-topic"
  val numPartitionsOfInputTopic =5
  val streams =(1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams,Map(inputTopic ->1),StorageLevel.MEMORY_ONLY_SER).map(_._2)}
  val unifiedStream = ssc.union(streams)