Spark原理 | 初學Spark
什麼是Spark?
Spark是一個分散式計算引擎,2009年誕生於UC伯克利的AMPLab,2010年開源並於2013年成為Apache頂級專案。
Spark具有如下特點:
1.快速
DAG框架
Spark採用的是DAG框架,DAG是在MapReduce框架基礎上演化而來。
對於一些複雜的資料處理,比如有多個Reduce Stage,MapReduce框架中一個Reduce前面必須要有一個Map(Map-Reduce-Map-Reduce...),不能多個Reduce級聯處理,這樣會導致處理過程中會增加很多冗餘的Map階段,即使Map不做任何資料處理(讀取HDFS資料直接輸出),但是這個過程耗費了很多時間和資源。
DAG框架可以任意的組合Map/Reduce的運算元(如Map-Reduce-Reduce),更加靈活更快速。
如Tez(Tez也是DAG)文件裡面有例子說明,詳見 Hive/Hive+on+Tez," rel="nofollow,noindex" target="_blank">https://cwiki.apache.org/confluence/display/Hive/Hive+on+Tez, 其中以一個TPC-DS的例子進行了說明。
MapReduce是多程序模型,雖然可以更細粒度控制task佔用的資源,但是JVM啟動會消耗更多的時間,Spark則採用的是多執行緒模型,task啟動快,不同的task可以共享記憶體;
Spark可以對RDD資料集進行cache,對迭代計算很友好更快速
Spark的效能優化專案Tungsten Project( https://www.slideshare.net/databricks/spark-performance-whats-next) ,對計算過程中的記憶體管理/CPU快取友好等方面進行了很多優化。如WholeStageCodeGen,對火山模型(Volcano Model)進行了優化,減少了函式呼叫等。
2.易用
運算元豐富
使用者可以將運算元進行組合完成資料處理,如wordcount ,只需要寫幾行程式碼,相對於MapReduce實現Map和Reduce要簡單很多。
val rdd = spark.sparkContext.textFile("/README.md") val counts = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _) counts.saveAsTextFile("/results") /**
互動式
可以通過SQL/Scala/Python/R的shell進行互動式的使用
如:
[root@emr-header-1 ~]# spark-shell scala>spark.sql("create table t(a string)") [root@emr-header-1 ~]# pyspark >>> textFile = spark.read.text("README.md") [root@emr-header-1 ~]# spark-sql > select * from t; /**
介面更統一
Spark 2.0中StructStreaming/MLlib等介面基本統一到DataSet/DataFrame,API簡單,使得程式設計更容易,而SparkSQL/SparkCore模組的優化,可以立即體現到上層模組(Streaming/MLlib等)。
3.通用
Spark包含SparkSQL/StructStreaming/MLlib/GraphX,能夠處理各種大資料處理需求,如ETL離線處理、流式計算、機器學習、圖計算等,只需要Spark就能應對大資料處理中的大部分場景。
4.融合
多種部署方式
不僅可以獨立部署standalone模式,也可以執行在Yarn/Mesos等資源排程框架之上
多資料來源接入
可以讀寫HBase/HDFS/Cassandra/OSS/S3/Hive/Alluxio等DataSource,如:
// 初始化SparkSession val spark = SparkSession.builder .master("local[2]") //local模式 .appName("test") .enableHiveSupport() //使用Hive的元資料管理 .getOrCreate() val df1 = spark.read.parquet(basePath) val df2 = spark.read.text("oss://bucket/path/xxx") val df3 = spark.sql("select * from t") /**
Spark技術棧

-
資料來源
Spark支援對接各種資料來源,如HDFS/OSS/HBase/MySQL/Kafka等。
DataFrame封裝了一些資料來源接入,比如json/csv/mysql等,使用者可以直接通過呼叫相關api去讀寫這些資料來源檔案;
DataFrame還提供了DataSource接入的擴充套件api,使用者可以根據api將自己的DataSource接入Spark;
使用者也可以將資料來源封裝成RDD來使用;網站 https://spark-packages.org/ 上有很多第三方實現的資料來源可以直接拿來使用。
社群目前在做DataSourceV2的重構( https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#)
-
資源排程
Spark可以通過YARN/Mesos進行資源管理。如將Spark作業提交到YARN的某個佇列中,通過控制佇列的分配達到對Spark作業的資源限制管理等。
3.Spark引擎
Spark是一個大資料處理的工具包,一套引擎裡面可以做ETL/流計算/圖計算等。
SparkCore是Spark引擎的最底層,它的任何改動/優化都會影響到上層模組。它以RDD為核心,將外層資料來源抽象成RDD資料集,然後通過一些運算元(transformation)對RDD進行轉換操作(如map/filter等)生成新的RDD,最終通過運算元(action)真正的提交執行獲取所需資料結果。


如上圖所示,將HDFS檔案抽象成RDD1資料集,然後通過map/filter運算元對RDD1進行轉換處理,分別得到了新的RDD2/RDD3,最後通過saveAsTextFile這個action型別的運算元真正觸發作業的提交執行,將結果寫到HDFS中。
上圖只是一個簡單的SparkCore中以RDD為核心的資料處理流程。RDD提供了很多操作運算元,使用者可以利用這些運算元進行組合來處理更復雜的資料處理邏輯,如groupBy/reduce等等。
SparkCore對RDD資料處理過程,包含很多模組,比如Stage/Task的排程, Shuffle, 記憶體管理, 排序等等,以後再詳細介紹。
下面是一張大概的內部執行流程圖,圖中相關概念可以去Spark官網檢視(如RDD/transformation和action兩種型別的運算元/寬依賴/窄依賴等)。
文章來源:HBase技術社群公眾號(微信號:hbasegroup),非常歡迎大家積極投稿。