1. 程式人生 > >Spark Release 2.2.0 最新版本釋出,Spark 2.2.0是Spark 2.x中第一個在生產環境可以使用的版本,對於Spark具有里程碑意義

Spark Release 2.2.0 最新版本釋出,Spark 2.2.0是Spark 2.x中第一個在生產環境可以使用的版本,對於Spark具有里程碑意義

第2章 Spark 2.X技術及原理

Apache官方網站於2017711日釋出了Spark Release 2.2.0版本, Apache Spark 2.2.0版本是Spark 2.x系列上的第三個版本。Spark 2.2.0Spark 2.x中第一個在生產環境可以使用的版本,對於Spark具有里程碑意義。Spark 2.2.0版本中 Structured Streaming 的實驗性標記(experimental tag)已經被移除,此版本更多地側重於系統的可用性(usability)、穩定性(stability)以及程式碼的polish,解決了1100tickets。此外,只要安裝pyspark

,在Spark 2.2.0版本中PySpark可用於pypiSpark 2.2.0版本移除了對 Java 7 以及 Hadoop 2.5及其之前版本的支援,移除了對Python 2.6的支援。

Apache Spark2.2.0版本的一些新功能:

·          Core and Spark SQL

·          Structured Streaming

·          MLlib

·          SparkR

·          GraphX

·          Deprecations

·          Changes of behavior

·          Known Issues

·          Credits

如無特殊說明,本書所有內容都基於最新最穩定的Spark 2.2.0版本的原始碼編寫,為體現Spark原始碼的演進過程,部分核心原始碼在Spark 1.6.xSpark 2.1.x原始碼的基礎上,新增Spark 2.2.0版本的原始碼,便於讀者系統比對、研習Spark原始碼。

2.1   Spark2.X綜述  

Spark 2.0中更新發布了新的流處理框架(Structured Streaming);對於API的更新,Spark 2.0版本API的更新主要包括DataFrame、DataSet、SparkSession、累加器API、Aggregator API等API的變動。

2.1.1 連續應用程式  

自從Spark得到廣泛使用以來,其流處理框架Spark Streaming也逐漸地吸引到了很多使用者,得益於其易用的高階API和一次性語義,使其成為使用最廣泛的流處理框架之一。但是我們不僅僅需要流處理來構建實時應用程式,很多時候我們的應用程式只有一部分需要用到流處理,對於這種應用程式,Databricks公司把它稱為ContinuousApplication(實時響應資料的端到端的應用程式),也就是連續的應用程式。在Continuous Application中會有許多難點,比如資料互動的完整性、流資料與離線資料的結合使用、線上機器學習等。

Spark2.0最重磅的更新是新的流處理框架——Structured Streaming。它允許使用者使用DataFrame/DataSetAPI編寫與離線批處理幾乎相同的程式碼,便可以作用到流資料和靜態資料上,引擎會自動增量化流資料計算,同時保證了資料處理的一致性,並且提供了和儲存系統的事務整合。

2.1.2 新的API  

在Spark2.0版本的API中共有如下幾個API的變動:

1)統一了DataFrame和DataSet。現在DataFrame不再是一個獨立的類,而是作為DataSet[Row]的別名定義在org.apache.spark.sql這個包物件中。

sql\package.scala原始碼如下:

1.           packageobject sql {

2.          

3.           /**

4.            * Converts a logical plan into zero or moreSparkPlans.  This API is exposed forexperimenting

5.            * with the query planner and is not designedto be stable across spark releases. Developers

6.            * writing libraries should instead considerusing the stable APIs provided in

7.            * [[org.apache.spark.sql.sources]]

8.            */

9.           @DeveloperApi

10.        @InterfaceStability.Unstable

11.        type Strategy = SparkStrategy

12.       

13.        type DataFrame = Dataset[Row]

14.      }

2)加入了SparkSession用於替換DataFrame和Dataset API的SQLContext和HiveContext(這兩個API仍然可以使用)。

3)為SparkSession為SparkSQL加入一個新的,精簡的配置引數–RuntimeConfig,可以用來設定和獲得跟SparkSQL有關的Spark或者Hadoop設定。

SparkSession.scala原始碼:

1.           /**

2.            * Runtime configuration interface for Spark.

3.            *

4.            * This is the interface through which theuser can get and set all Spark and Hadoop

5.            * configurations that are relevant to SparkSQL. When getting the value of a config,

6.            * this defaults to the value set in theunderlying `SparkContext`, if any.

7.            *

8.            * @since 2.0.0

9.            */

10.        @transient lazy val conf: RuntimeConfig = newRuntimeConfig(sessionState.conf)

4)更簡單,更高效能的累加器API。

5)用於DataSet中型別化聚合的新的改進的AggregatorAPI。

2.2   Spark2.X Core  

         本節講解Tungsten引擎的新特性;SparkSession使用方法;以及一個更加簡單和更高效能的累加器API的使用。

2.2.1 第二代Tungsten引擎

Spark備受矚目的原因之一在於它的高效能,Spark開發者為了保持這個優勢一直在不斷的進行各種層次的優化,其中最令人興奮的莫過於鎢絲計劃(ProjectTungsten),因為鎢絲計劃的提出給Spark帶來了極大的效能提升,並且在一定程度上引導了Spark的發展方向。

Spark是使用Scala和Java語言開發的,不可避免的執行JVM之上,當然記憶體管理也是依賴於JVM的記憶體管理機制,而對於大資料量的基於記憶體的處理,JVM物件模型對記憶體的額外開銷,以及頻繁的GC和Full GC都是非常致命的問題。另外,隨著網路頻寬和磁碟IO的不斷提升,記憶體和CPU又重新作為效能瓶頸受到關注,JVM物件的序列化、反序列化帶來的效能損耗急需解決。Spark1.5版本加入的鎢絲計劃從3大方面著手解決這些問題:

1) 統一記憶體管理模型和二進位制處理(BinaryProcessing)。統一記憶體管理模型來代替

之前基於JVM的靜態記憶體管理,引入Page來管理堆記憶體和堆外記憶體(on-heap和off-heap),並且直接操作記憶體中的二進位制資料而不是Java物件,很大程度上擺脫了JVM記憶體管理的限制。

2) 基於快取感知的計算(Cache-aware Computation)。Spark記憶體讀取操作也會帶來

一部分效能損耗,鎢絲計劃便設計了快取友好的演算法和資料結構來提高快取命中率,充分利用L1/L2/L3三級快取,大幅提高了記憶體讀取速度,進而縮短了記憶體中的整個計算過程的時間。

3) 程式碼生成(Code Generation)。在JVM中,所有程式碼的執行由直譯器來一步步的

解釋執行,CodeGeneration這一功能則在Spark執行時動態生成用於部分運算元求值的bytecode,減少了對基礎資料型別的封裝,並且緩解了呼叫虛擬函式的額外開銷。

Spark2.0升級了第二代Tungsten引擎。其中最重要的一點是把CodeGeneration作用於全階段的SparkSQL和DataFrame之上,(即全階段程式碼生成Whole Stage Code Generation),為常見的運算元帶來了十倍左右的效能提升!

2.2.2 SparkSession  

加入SparkSession,取代原來的SQLContext和HiveContext,為了相容兩者仍然保留。SparkSession使用方法如下:

1.           SparkSession.builder()

2.               .master("local")

3.               .appName("Word Count")

4.               .config("spark.some.config.option","some-value")

5.               .getOrCreate()

首先獲得SparkSession的Builder,然後使用Builder來為SparkSession設定引數,最後使用getOrCreate方法來檢測當前執行緒是否有一個已經存在的Thread-local級別的SparkSession,如果有則返回它,沒有則檢測是否有全域性級別的SparkSession,有則返回沒有則建立新的SparkSession。

在程式中如果要使用SparkContext時可以呼叫sparkSession.sparkContext即可。在程式的最後我們需要呼叫sparkContext.stop方法,這個方法會呼叫sparkContext.stop來關閉sparkContext。

從Spark2.0開始,DataFrame和DataSet既可以容納靜態、有限的資料,也可以容納無限的流資料,所以使用者也可以使用SparkSession像建立靜態資料集一樣來建立流式資料集,並且可以使用相同的操作運算元。這樣整合了實時流處理和離線處理的框架,結合其它容錯、擴充套件等特性形成了完整的Lambda架構。

2.2.3 Accumulator API 

Spark2.0引入了一個更加簡單和更高效能的累加器API,比如在1.X版本中可以這樣使用累加器:

1.         //定義累加器,這裡直接使用SparkContext內建的累加器,設定初始值為0,名字為"My Accumulator"

2.         val accum = sc.accumulator(0,"My Accumulator")

3.         //計算值

4.         sc.parallelize(Array(1, 2, 3,4)).foreach(x => accum += x)

5.         //獲取累加器的值,(Executor上面只能對累加器進行累加操作,只有Driver才能讀取累加器的值,Driver讀取值的時候會把各個Executor上儲存的本地累加器的值加起來),這裡結果是10。

6.         accum.value

在2.X版本里使用SparkContext裡內建的累加器:

1.          //與1.X不同的是需要指定累加器的型別,目前SparkContext有Long型別和Double型別的累加器可以直接使用(不需要指定初始值)。

2.         val accum =sc.longAccumulator("My Accumulator")

3.         sc.parallelize(Array(1, 2, 3,4)).foreach(x => accum.add(x))

4.         print(accum.value)

只使用SparkContext裡內建的累加器功能肯定不能滿足略微複雜的業務型別,此時我們就可以自定義累加器。在1.X版本的做法是(下面是官網的例子):

1.          //繼承AccumulatorParam[Vector],返回型別為Vector。

2.         object VectorAccumulatorParamextends AccumulatorParam[Vector] {

3.         //定義“零”值,這裡把傳入的初始值的size作為“零”值。

4.         def zero(initialValue: Vector):Vector = {

5.             Vector.zeros(initialValue.size)

6.           }

7.         //定義累加操作的計算方式

8.           def addInPlace(v1: Vector, v2: Vector):Vector = {

9.             v1 += v2

10.        }

11.      }

上面的累加器元素和返回型別是相同的,在Scala中還有另外一種方式來自定義累加器,使用者只需要繼承Accumulable,就可以把元素和返回值定義為不同的型別,這樣我們就可以完成新增操作(比如像Int型別的List裡新增整數,此時元素為Int型別,而返回型別為List)。

在Spark2.X中,加入了一個新的抽象類--AccumulatorV2,繼承這個類要實現幾個方法:

add方法:指定元素相加操作。

copy方法:指定對自定義的累加器的拷貝操作。

isZero方法:返回該累加器的值是否為“零”。

merge方法:合併兩個相同型別的累加器。

reset方法:重置累加器。

value方法:返回累加器當前的值。

     在重寫這幾個方法之後,只需例項化自定義累加器,並連同累加器名字一起傳給sparkContext.register方法即可。

我們來簡單實現一個把字串合併為陣列的累加器:

1.          //首先要繼承AccumulatorV2,並指定輸入為String型別,輸出為ArrayBuffer[String]

2.         class MyAccumulator extendsAccumulatorV2[String, ArrayBuffer[String]] {

3.         //設定累加器的結果,型別為ArrayBuffer[String]

4.           private var result = ArrayBuffer[String]()

5.          

6.         //判斷累加器當前值是否為“零值”,這裡我們指定如果result的size為0則累加器的當前值是“零值”

7.           override def isZero: Boolean =this.result.size == 0

8.          

9.         //copy方法設定為新建本累加器,並把result賦給新的累加器

10.        override def copy(): AccumulatorV2[String,ArrayBuffer[String]] = {

11.          val newAccum = new MyAccumulator

12.          newAccum.result = this.result

13.          newAccum

14.        }

15.      //reset方法設定為把result設定為新的ArrayBuffer

16.      override def reset(): Unit =this.result == new ArrayBuffer[String]()

17.       

18.      //add方法是把傳進來的字串新增到result內

19.      override def add(v: String):Unit = this.result += v

20.       

21.      //merge方法:把兩個累加器的result合併起來

22.      override def merge(other:AccumulatorV2[String, ArrayBuffer[String]]): Unit = {

23.            result.++=:(other.value)

24.          }

25.      //value方法返回result

26.        override def value: ArrayBuffer[String] =this.result

27.      }

28.      接著在main方法裡使用累加器:

29.      val Myaccum = newMyAccumulator()

30.       

31.      //向SparkContext註冊累加器

32.          sc.register(Myaccum)

33.       

34.      //把“a”,“b”“c”“d”新增進累加器的result這個陣列並打印出來

35.          sc.parallelize(Array("a","b","c","d")).foreach(x=> Myaccum.add(x))

36.          println(Myaccum.value)

執行結果顯示的ArrayBuffer裡的值順序是不固定的,取決於各個Executor的值到達Driver的順序。

2.3   Spark2.X SQL   

Spark 2.0通過對SQL2003的支援增強了SQL功能,Catalyst新引擎提升了Spark查詢優化的速度;本節對DataFrame和Dataset API、時間視窗進行了講解。   

Apache Spark2.2.0版本中核心和Spark SQL的更新:

1、  API更新

·          SPARK-19107: Supportcreating hive table with DataFrameWriter and Catalog

·          SPARK-13721: Add support forLATERAL VIEW OUTER explode()

·          SPARK-18885: Unify CREATE TABLEsyntax for data source and hive serde tables

·          SPARK-16475: Added BroadcastHints BROADCAST, BROADCASTJOIN, and MAPJOIN, for SQL Queries

·          SPARK-18350: Support sessionlocal timezone

·          SPARK-19261: Support ALTERTABLE table_name ADD COLUMNS

·          SPARK-20420: Add events tothe external catalog

·          SPARK-18127: Add hooks andextension points to Spark

·          SPARK-20576: Support generichint function in Dataset/DataFrame

·          SPARK-17203: Data sourceoptions should always be case insensitive

·          SPARK-19139: AES-basedauthentication mechanism for Spark

2、  效能優化和系統穩定性

基於成本的優化:

·          SPARK-17075 SPARK-17076SPARK-19020 SPARK-17077 SPARK-19350: Cardinality estimation for filter, join,aggregate, project and limit/sample operators

·          SPARK-17080: Cost-based joinre-ordering

·          SPARK-17626: TPC-DS performanceimprovements using star-schema heuristics

·          SPARK-17949: Introduce a JVMobject based aggregate operator

·          SPARK-18186: Partialaggregation support of HiveUDAFFunction

·          SPARK-18362 SPARK-19918:File listing/IO improvements for CSV and JSON

·          SPARK-18775: Limit the maxnumber of records written per file

·          SPARK-18761: Uncancellable /unkillable tasks shouldn’t starve jobs of resources

·          SPARK-15352: Topology awareblock replication

3、  其他的一些變化

·          SPARK-18352: Support forparsing multi-line JSON files

·          SPARK-19610: Support forparsing multi-line CSV files

·          SPARK-21079: Analyze TableCommand on partitioned tables

·          SPARK-18703: Drop StagingDirectories and Data Files after completion of Insertion/CTAS againstHive-serde Tables

·          SPARK-18209: More robustview canonicalization without full SQL expansion

·          SPARK-13446: [SPARK-18112]Support reading data from Hive metastore 2.0/2.1

·          SPARK-18191: Port RDD API touse commit protocol

·          SPARK-8425:Add blacklistmechanism for task scheduling

·          SPARK-19464: Remove supportfor Hadoop 2.5 and earlier

·          SPARK-19493: Remove Java 7support

2.3.1 Spark SQL 

Spark 2.0通過對SQL2003的支援大幅增強了SQL功能,現在可以執行所有99個TPC-DS查詢。這個版本中的SparkSQL主要有以下幾點改進:

1)  引入了支援ANSISQL和HiveSQL的本地解析器。

2)  本地實現DDL命令。

3)  支援非相關標量子查詢。

4)  在Where與having條件中,支援(not)in和(not)exists。

5)  即使Spark沒有和Hive整合搭建,SparkSQL也支援它們一起搭建時的除了Hive連線、Hive UDF(UserDefinedFunction使用者自定義函式)和指令碼轉換之外的大部分功能。

6)  Hive式的分桶方式的支援。

另外Catalyst查詢優化器對於常見的工作負載也有了很多提升,對比如nullability propagation之類的查詢做了更好的優化。Catalyst查詢優化器從最早的應用於SparkSQL到現在應用於DataSetAPI,對Spark程式的高效率執行起到了非常重要的作用,並且隨著DataSetAPI的流行,以及優化器自身的不斷演進,未來肯定會對Spark的所有框架帶來更高的執行效率。

2.3.2 DataFrame和Dataset API 

在Spark 1.x版本中,DataFrame的API存在很多問題,比如說DataFrame不是型別安全的(nottype-safe)、不是面向物件的(notobject-oriented),為了克服這些問題,Spark在1.6版本引入了Dataset並在2.0版本的Scala和Java中將二者進行了統一(在Python和R中,由於缺少型別安全性,DataFrame仍是主要的程式設計介面),DataFrame成為了DataSet[Row]的別名,而且Spark2.0版本為DataSet的型別化聚合加入了一個新的聚合器,讓基於DataSet的聚合更加高效。

在2.1版本中DataFrame和Dataset API晉升為穩定的API,也就是說可以在生產環境中使用它們,且後續會基於向後相容的前提下不斷強化。

DataSetAPI是High-LevelAPI,有更高的抽象級別,與RDDAPI這樣的Low-LevelAPI相比更加易用,它對於提升使用者的工作效率,以及提高程式的可讀性而言意義非凡。由於WholeStageCodeGeneration的引入,SparkSQL和DataSetAPI中的常見運算元的效能提升了2到10倍。加上Catalyst查詢優化器和Tungsten的幫助,使用者可以不用過多的關注對程式優化,也能獲得很好的執行效率。

所以毋庸置疑地,這樣一種簡單高效的API將成為Spark未來主流的程式設計介面!

2.3.3 Timed window    

對於經常用到複雜SQL的使用者而言,視窗函式一直以來都是不可或缺的,在Spark2.X版本中通過對Hive中的視窗函式的本地化實現,來使用spark的記憶體管理機制,從而提升了視窗函式的效能。

2.4 Spark 2.X Streaming 

Spark2.0為我們帶來了一個新的流處理框架Structured Streaming,這是一個基於Spark SQL和Catalyst優化器構建的高階流API。它允許使用者使用與操作靜態資料的DataFrame / Dataset API對流資料進行程式設計,利用Catalyst優化器自動地增量化查詢計劃。並且它不但支援流資料的不斷寫入,還支援其他的靜態資料的插入。

Apache Spark2.2.0版本中Structured Streaming的更新:

1、    整體可用性:

·          SPARK-20844: The StructuredStreaming APIs are now GA and is no longer labeled experimental

2、  Kafka 提升

·          SPARK-19719: Support forreading and writing data in streaming or batch to/from Apache Kafka

·          SPARK-19968: Cached producerfor lower latency kafka to kafka streams.

3、    API 更新:

·          SPARK-19067: Support forcomplex stateful processing and timeouts using [flat]MapGroupsWithState

·          SPARK-19876: Support for onetime triggers

4、    其它的一些變化:

·          SPARK-20979: Rate source fortesting and benchmarks