1. 程式人生 > >[1.2]Spark core程式設計(一)之RDD總論與建立RDD的三種方式

[1.2]Spark core程式設計(一)之RDD總論與建立RDD的三種方式

參考

場景

  • RDD的理解
    一、RDD是基於工作集的應用抽象;是分散式、函數語言程式設計的抽象。
    MapReduce:基於資料集的處理。兩者的共同特徵:位置感知(具體資料在哪裡)、容錯、負載均衡。
    基於資料集的處理:從物理儲存裝置上載入資料,然後操作資料,寫入物理儲存裝置。eg、Hadoop MapReduce
    不適應場景:
    1、不適合於大量的迭代
    2、不適合於交付式查詢
    3、基於資料流的方式,不能夠複用曾經的結果或者中間計算結果。
    二、RDD的”彈性”(Resilient)
    1、自動的進行記憶體與磁碟資料儲存的切換
    2、基於Lineage的高效容錯
    3、Task如果失敗會自動進行特定次數的重試
    4、Stage如果失敗會自動進行特定次數的重試,而且只會計算失敗的分片
    5、checkpoint和persist:對於長連結的操作,把中間的資料放到磁碟上去
    6、資料分片的高度彈性(提高、降低並行度)
    7、資料排程彈性:DAG TASK和資源管理無關

  • 建立RDD的幾種方式
    1、基於程式中的集合建立RDD-作用:測試
    2、基於本地檔案建立RDD-作用:大資料量的測試
    3、基於HDFS建立RDD-作用:生產環境最常用的RDD建立方式
    4、基於DB、NoSQL(例如HBase)、S3、基於資料流建立RDD

實驗

package main.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * 建立RDD的三種方式初體驗
 */
object RDDBaseOnCollection {

  def main(args: Array[String]): Unit = {
    val
conf = new SparkConf().setMaster("local[*]").setAppName("RDDBaseOnCollection") val sc = new SparkContext(conf) /* * 1、從scala集合中建立RDD * 計算:1+2+3+...+100 */ val nums = 1 to 100 val rdd = sc.parallelize(nums) val sum = rdd.reduce(_+_) println("sum:"+sum) /* * 2、從本地檔案系統建立RDD * 計算 people.json 檔案中字元總長度 */
val rows = sc.textFile("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json") val length = rows.map(row=>row.length()).reduce(_+_) println("total chars length:"+length) /* * 3、從HDFS建立RDD(lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at textFile at) * 計算 hive_test 檔案中字元長度 */ val lines = sc.textFile("hdfs://112.74.21.122:9000/user/hive/warehouse/hive_test") println( lines.map(row=>row.length()).reduce(_+_)) } }

總結

一、“ RDD(resilient distributed dataset) is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.”
二、RDD是基於工作集的應用抽象;是分散式、函數語言程式設計的抽象。