Structured Streaming 實現超低延遲
浪院長,最近忙死了,寫文章的時間都沒了。但是,都說時間就像海綿裡的水,擠擠就有了。所以,今晚十點半開始整理這篇Structured streaming 相關的文章。
最近,忙於開發完善flink平臺,並且使用我們的平臺去支援一些複雜的業務,比如使用者畫像處理等。遇見了很多bug和效能點,後面陸續出文章給大家解析。
書歸正傳,大家都知道spark streaming是微批批處理,而Structured streaming在2.3以前也是批處理,在2.3引入了連續處理的概念,延遲大幅度降低值~1ms,但是還有諸多限制,這點比flink差了許多。
至於低延遲的測試,建議本文使用本文程式碼去測試,kafka source->kafka sink,這樣便於觀察延遲。
連續處理是Spark 2.3中引入的一種新的實驗版本流執行模式,可實現極低(~1 ms)端到端延遲,並且具有至少一次處理容錯保證。 structured streaming的連續處理模式與微批處理模式進行比較,微批處理引擎可以實現一次性保證,但微批處理最好僅可實現約100ms的延遲。 對於某些型別的查詢(在下面討論),可以選擇在不修改應用程式碼的情況下執行該模式(即,不更改DataFrame / Dataset操作)。
要在連續處理模式下執行支援的查詢,您只需指定一個連續觸發器,並將所需的checkpoint間隔作為引數。 例如浪尖的demo如下:
object ContinuousProcessing { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client") .set("yarn.resourcemanager.hostname", "mt-mdh.local") .set("spark.executor.instances","2") .set("spark.default.parallelism","4") .set("spark.sql.shuffle.partitions","4") .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar" ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar" ,"/opt/jars/kafka-clients-0.10.2.2.jar" ,"/opt/jars/kafka_2.11-0.10.2.2.jar" ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar")) val spark = SparkSession .builder .appName("StructuredKafkaWordCount") .config(sparkConf) .getOrCreate() spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "mt-mdh.local:9093") .option("subscribe", "StructuredSource") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "mt-mdh.local:9093") .option("topic", "StructuredSink") .option("checkpointLocation","/sql/checkpoint") .trigger(Trigger.Continuous("1 second")) // only change in query .start() .awaitTermination() } }
checkpoint 間隔為1秒意味著連續處理引擎將每秒記錄查詢的進度。 生成的checkpoint採用與微批處理引擎相容的格式,因此可以使用任何觸發器重新啟動任何查詢。 例如,假如查詢支援微批處理和連續處理,那麼實際上也可以用連續處理觸發器去啟動微批處理觸發器,反之亦然。
請注意,無論何時切換到連續模式,都將獲得至少一次的容錯保證。
支援的查詢
從Spark 2.3開始,連續處理模式僅支援以下型別的查詢。
-
Operations:在連續模式下僅支援dataset/dataframe的類似於map的操作,即支援projection(select,map,flatMap,mapPartitions等)和selection(where,filter等)。
-
除了聚合函式(因為尚不支援聚合),current_timestamp()和current_date()(使用時間的確定性計算具有挑戰性)之外,支援所有SQL函式。
Sources
-
Kafka Source:支援所有操作。
-
Rate source:適合測試。只有連續模式支援的選項是numPartitions和rowsPerSecond。
Sinks
-
Kafka sink:支援所有選項。
-
Memory sink:適合除錯。
-
Console sink:適合除錯。支援所有操作。請注意,控制檯將列印你在連續觸發器中指定的每個checkpoint間隔。
更詳細的關於sink和source資訊,請參閱輸入源和輸出接收器部分的官網。雖然控制檯接收器非常適合測試,但是使用Kafka作為源和接收器可以最好地觀察到端到端的低延遲處理。
注意事項
-
連續處理引擎啟動多個長時間執行的任務,這些任務不斷從源中讀取資料,處理資料並連續寫入接收器。 查詢所需的任務數取決於查詢可以並行從源讀取的分割槽數。 因此,在開始連續處理查詢之前,必須確保群集中有足夠的核心並行執行所有任務。 例如,如果您正在讀取具有10個分割槽的Kafka主題,則群集必須至少具有10個核心才能使查詢正常執行。
-
停止連續處理流可能會產生虛假的任務終止警告。 這些可以安全地忽略。
-
目前沒有自動重試失敗的任務。 任何失敗都將導致查詢停止,並且需要從檢查點手動重新啟動。(深受其害,kafka topic沒資料流入也會掛掉的)