1. 程式人生 > >Spark基礎 -- Spark Shell -- RDD -- 運算元

Spark基礎 -- Spark Shell -- RDD -- 運算元

Spark基礎 – Spark Shell – RDD – 運算元

文章目錄

一、簡介

​ Apache Spark 是專為大規模資料處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是——Job中間輸出結果可以儲存在記憶體中,從而不再需要讀寫HDFS,因此Spark能更好地適用於資料探勘與機器學習等需要迭代的MapReduce的演算法。

二、Spark 1.6.3部署

準備工作
  1. 三臺Linux伺服器,安裝好JDK1.8、Hadoop2.6

  2. 下載安裝包spark1.6.3

  3. 將安裝包上傳到第一臺Linux伺服器上

解壓安裝

解壓安裝包到指定位置

tar -zxvf spark-1.6.3-bin-hadoop2.6.tgz -C /home/bigdata/installsoft/

將資料夾重新命名為spark-1.6.3

配置spark,master高可用
  1. 進入spark安裝目錄下的conf目錄

    cd /home/bigdata/installsoft/spark-1.6.3/conf

  2. 將spark-env.sh.template重新命名為spark-env.sh

    mv spark-env.sh.template spark-env.sh

  3. 編輯spark-env.sh並新增配置

    export JAVA_HOME=/home/bigdata/installsoft/jdk1.8.0_181/
    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=cdhnocms01,cdhnocms02,cdhnocms03 -Dspark.deploy.zookeeper.dir=/spark"
    export SPARK_MASTER_PORT=7077
    export HADOOP_CONF_DIR=/home/bigdata/installsoft/hadoop-2.6.0-cdh5.13.2/etc/hadoop
    
  4. 將slaves.template重新命名為slaves

    mv slaves.template slaves

  5. 在slaves檔案彙總新增worker節點所在的主機

    cdhnocms01
    cdhnocms02
    cdhnocms03

配置環境變數

在使用者家目錄下.bash_profile檔案中新增

SPARK_HOME=/home/bigdata/installsoft/spark-1.6.3/

PATH= P A T H : PATH: SPARK_HOME/bin:$SPARK_HOME/sbin

儲存退出後source .bash_profile

分發配置好的Spark到其他節點
scp -r /home/bigdata/installsoft/spark-1.6.3/ cdhnocms02:/home/bigdata/installsoft/
scp -r /home/bigdata/installsoft/spark-1.6.3/ cdhnocms02:/home/bigdata/installsoft/

直接將環境變數檔案傳送到其他節點,或者在其他節點上一一配置環境變數

三、Spark叢集啟動和測試

啟動

在cdhnocms01節點上執行/home/bigdata/installsoft/spark-1.6.3/sbin/start-all.sh

在cdhnocms02節點上執行/home/bigdata/installsoft/spark-1.6.3/sbin/start-master.sh

此時使用jps檢視三臺機器程序,如下表

cdhnocms01 cdhnocms02 cdhnocms03
Master、Worker Master、Worker Worker

注意:雖然配置了環境變數,但由於名稱相同,如果直接在任意目錄直接執行start-all.sh,啟動的將會是hadoop的相關程序。解決辦法:修改啟動指令碼的檔名。

測試

執行官方自帶的例子

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cdhnocms01:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/home/bigdata/installsoft/spark-1.6.3/lib/spark-examples-1.6.3-hadoop2.6.0.jar \
100

執行完成後可以在命令列中找到結果

1542621376664

在web監控頁面:cdhnocms02:8080上可以檢視任務狀態

1542621318493

四、Spark Shell

spark shell簡介

spark-shell是Spark自帶的互動式Shell程式,方便使用者進行互動式程式設計,使用者可以在該命令列下用scala編寫spark程式。

spark shell 啟動
spark-shell \
--master spark://cdhnocms01:7077 \
--executor-memory 1G \
--total-executor-cores 2

引數說明

–master spark://cdhnocms02:7077 指定Master的地址

–executor-memory 1G 指定每個worker可用記憶體為1G

–total-executor-cores 2 指定整個叢集使用的cup核數為2個

注意

如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程式,其實是啟動了spark的local模式,該模式僅在本機啟動一個程序,沒有與叢集建立聯絡。

啟動spark shell後,可以注意到在控制檯有如下兩條語句:

1542624224309

1542624246881

意思是Spark Shell中已經預設將SparkContext類初始化為物件sc,SQLContext類初始化為物件sqlContext。使用者程式碼如果需要用到,則直接使用對應的物件名即可即可。

在spark shell中編寫wordcount程式
  1. 上傳資料檔案到hdfs://cdhnocms01:8020/userdata/wc.txt
  2. val file = sc.textFile(“hdfs://cdhnocms01:8020/userdata/wc.txt”)
  3. val words = file.flatMap(_.split(" "))
  4. val map = words.map((_,1))
  5. val result = map.reduceByKey(+)
  6. 接下來可以直接通過result.collect將結果列印到控制檯,或者result.saveAsTextFile(“hdfs://cdhnocms01:8020/out/20181119”),將結果檔案儲存在hdfs後檢視結果

五、IDEA中編寫WordCount

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com</groupId>
    <artifactId>spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/day01</sourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>
編寫程式碼
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount_scala {

  def main(args: Array[String]): Unit = {
    // 1.獲取spark的conf
    // 本地執行
    val conf = new SparkConf().setAppName("spark_wordcount_scala").setMaster("local[2]")
    // 提交到叢集中執行
    //val conf = new SparkConf().setAppName("spark_wordcount_scala").setMaster("spark://cdhnocms01:7077")
    // 2.根據conf物件獲取sparkContext(spark的上下文)
    val sc = new SparkContext(conf)
    // 讀取hdfs中的資料
    sc.hadoopConfiguration.addResource("core-site.xml")
    sc.hadoopConfiguration.addResource("hdfs-site.xml")
   
    //第二種讀取hdfs的HA的檔案資料
    //    sc.hadoopConfiguration.set("")

    // 3.使用sc進行操作
    // 讀取資料來源
    val words:RDD[String] = sc.textFile(args(0))
    val res:RDD[(String,Int)] = words.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.size)
    // 列印
    res.foreach(f=>println(f))
    //4. 關閉sc
    sc.stop()


  }
}

可以直接在idea中配置好輸入引數後執行,可以的到結果

打包到Linux伺服器中執行

使用Maven打包後,將jar包上傳至Linux中,執行命令:

spark-submit \
--class SparkWordCount_scala \
--master spark://cdhnocms01:7077 \
/home/bigdata/userjars/spark-1.0-SNAPSHOT.jar \
hdfs://bigdata/userdata/wc.txt

程式碼的輸出結果(在網頁監控埠任務的stdout中檢視):

1542625835062

六、彈性分散式資料集RDD

RDD簡介

​ RDD(Resilient Distributed Dataset),分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料流模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

RDD屬性
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 an HDFS file)

屬性詳解:

  1. 一組分片(Partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。
  2. 一個計算每個分割槽的函式。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。
  3. RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。
  4. 一個Partitioner,即RDD的分片函式。當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
  5. 一個列表,儲存存取每個Partition的優先位置(preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。
RDD建立
  1. 由一個已經存在的Scala集合建立(Array、List、Seq等)

    sc.parallelize(args(0),args(1))

    第一個引數代表已存在的Scala集合,第二個引數代表分片個數,如果不指定則會採用預設值—分配的CPU core數量

  2. 由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等

    sc.textFile(“hdfs://cdhnocms01:8020/userdata/wc.txt”)

七、RDD程式設計API

Transformation

​ RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。

Transformation 含義
map(func) 返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成
filter(func) 返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成
flatMap(func) 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func) 類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T]=> Iterator[U]
mapPartitionsWithIndex(func) 類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是(Int, Iterator[T]) => Iterator[U]
sample(withReplacement,fraction, seed) 根據fraction指定的比例對資料進行取樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和引數RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD和引數RDD求交集後返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD
reduceByKey(func,[numTasks]) 在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定
aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending],[numTasks]) 與sortByKey類似,但是可以指定根據什麼排序
join(otherDataset,[numTasks]) 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset,[numTasks]) 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K,(Iterable,Iterable))型別的RDD
cartesian(otherDataset) 笛卡爾積
coalesce(numPartitions) 重新分割槽
repartition(numPartitions) 重新分割槽
repartitionAndSortWithinPartitions(partitioner) 重新分割槽
Action
Action 含義
reduce(func) 通過func函式聚集RDD中的所有元素,這個功能必須是可交換且可並聯的
collect() 在驅動程式中,以陣列的形式返回資料集的所有元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似於take(1))
take(n) 返回一個由資料集的前n個元素組成的陣列
takeSample(withReplacement,num, [seed]) 返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering]) takeOrdered和top類似,只不過以和top相反的順序返回元素
saveAsTextFile(path) 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字
saveAsSequenceFile(path) 將資料集中的元素以Hadoopsequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在資料集的每一個元素上,執行函式func進行更新。

八、運算元進階

  1. map/mapPartitions

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2)
    val rdd2 = rdd1.map(_ * 10)
    val rdd2 = rdd1.mapPartitions(_.map(_ * 10))
    rdd2.collect
    

    Array[Int] = Array(10, 20, 30, 40, 50, 60)

    Array[Int] = Array(10, 20, 30, 40, 50, 60)

    map運算元是將rdd中的每一個元素拿出來進行操作

    mapPartitions運算元是將一整個分片中的資料拿出來操作,所以需要繼續對每一個分片中各個資料拿出來操作

    rdd1.mapPartitions(_.toList.reverse.iterator).collect
    

    此操作是將每一個分片中的資料翻轉

  2. mapWith

    引數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U)

    其中preservesPartitioning指定是否需要使用父RDD的分片

    rdd1.mapWith(i => i*10)((a, b) => b+2).collect  
    

    Array[Int] = Array(2, 2, 2, 12, 12, 12)

    mapWith運算元是將rdd的分片下標取出進行操作元組(a,b)中a指資料,b指該資料的下標

  3. flatMapWith

    引數列表:(constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U])

    rdd1.flatMapWith(i => i, true)((x, y) => List((y, x))).collect
    

    Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (1,4), (1,5), (1,6))

    flatMapWith運算元類似於mapWith,但是每一個輸入元素可以被對映為0或多個輸出元素

  4. mapPartitionsWithIndex

    引數列表:(f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)

    val func = (index: Int, iter: Iterator[(Int)]) => {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    rdd1.mapPartitionsWithIndex(func).collect
    

    Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

  5. aggregate

    引數列表:(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

    zeroValue:初始值;seqOp:單個分割槽的合併操作;combOp:所有分割槽的彙總操作

    def func1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    rdd1.mapPartitionsWithIndex(func1).collect
    

    Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

    rdd1.aggregate(0)(math.max(_, _), _ + _)
    rdd1.aggregate(5)(math.max(_, _), _ + _)
    

    Int = 13 //首先在兩個分割槽中各自獲得最大值4、9,相加等於13

    Int = 19 //首先在兩個分割槽中各自獲得最大值5、9,相加等於14,再加上初始值5等於19


    val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
    def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    rdd2.mapPartitionsWithIndex(func2).collect
    

    Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1, val: d], [partID:1, val: e], [partID:1, val: f])

    rdd2.aggregate("")(_ + _, _ + _)
    rdd2.aggregate("=")(_ + _, _ + _)
    

    String = abcdef 或 String = defabc //字串拼接操作,在兩個分割槽中先各自拼接,最終的拼接時的順序是哪個分割槽先完成就哪個分割槽在前

    String = ==def=abc 或 String = ==abc=def //同上,但是在拼接前先加上初始值"="


    val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
    rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
    

    String = 42 或 String = 24

    前一個引數列表 (x,y):第一次時分別代表初始值和分割槽中的第一個值,以後是分別代表上一次結果的值和分割槽中新的值

    max(0,2) = 2, max(2,2) = 2

    max(0,3) = 3, max(3,4) = 4

    後一個引數列表(x,y):第一次是代表初始值與第一個分割槽的結果拼接,以後代表上一次的結果和新的分割槽的結果拼接

    同上,由於不同分割槽的完成時間不同,結果會出現兩種情況


    val rdd4 = sc.parallelize(List("12","23","345",""),2)
    rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
    

    String = 10 或 String = 01

    關鍵在於"".length=0,“0”.length=1


    val rdd5 = sc.parallelize(List("12","23","","345"),2)
    rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
    

    String = 11

  6. aggregateByKey

    相同的key進行操作

    引數列表:(zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)

    val pairRDD = sc.parallelize(List(("mouse", 2),("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12)), 2)
    def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
      iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
    }
    pairRDD.mapPartitionsWithIndex(func2).collect
    

    Array[String] = Array([partID:0, val: (mouse,2)], [partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:1, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)])

    pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
    pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
    

    Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) // dog:12;cat:5+12;mouse:2+4

    Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200)) // dog:100;cat:100+100;mouse:100+100

  7. combineByKey

    引數列表:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)

    val rdd1 = sc.textFile("hdfs://cdhnocms01:8020/userdata/wc.txt").flatMap(_.split(" ")).map((_, 1))
    rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n).collect
    

    Array[(String, Int)] = Array((word,2), (hello,2), (sql,1), (spark,3), (hadoop,2), (hi,1))

    rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n).collect
    

    Array[(String, Int)] = Array((word,12), (hello,12), (sql,11), (spark,13), (hadoop,12), (hi,11))

    // 對每一個value加10

    val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
    val rdd6 = rdd5.zip(rdd4)
    rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n).collect
    

    Array[(Int, List[String])] = Array((1,List(dog, cat, turkey)), (2,List(salmon, rabbit, wolf, bear, bee, gnu)))

  8. countByKey / countByValue

    val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
    rdd1.countByKey
    rdd1.countByValue
    

    scala.collection.Map[String,Long] = Map(b -> 2, a -> 1, c -> 2) // 統計相同key出現的次數

    scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1) // 統計相同元素出現的次數

  9. filterByRange

    val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
    val rdd2 = rdd1.filterByRange("c", "d")
    rdd2.collect
    

    Array[(String, Int)] = Array((c,3), (d,4), (c,2))

    // 對給定的範圍進行過濾

  10. flatMapValues

    val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
    rdd3.flatMapValues(_.split(" ")).collect
    

    Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

    對value進行相應的操作後壓頻

  11. foldByKey

    val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
    val rdd2 = rdd1.map(x => (x.length, x))
    val rdd3 = rdd2.foldByKey("")(_+_)
    rdd3.collect
    

    Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))

    val rdd = sc.textFile("hdfs://cdhnocms01:8020/userdata/wc.txt").flatMap(_.split(" ")).map((_, 1))
    rdd.foldByKey(0)(_+_).collect
    

    Array[(String, Int)] = Array((word,2), (hello,2), (sql,1), (spark,3), (hadoop,2), (hi,1))

  12. foreachPartition

    val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
    rdd1.foreachPartition(x => println(x.reduce(_ + _)))
    
  13. keyBy

    val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val rdd2 = rdd1.keyBy(_.length)
    rdd2.collect
    

    Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

    // 將結果作為key-value的key

  14. keys / values

    val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val rdd2 = rdd1.map(x => (x.length, x))
    rdd2.keys.collect
    rdd2.values.collect
    

    Array[Int] = Array(3, 5, 4, 3, 7, 5) // 獲得key

    Array[String] = Array(dog, tiger, lion, cat, panther, eagle) // 獲得value

  15. collectAsMap

    val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
    rdd.collectAsMap
    

    scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

  16. repartition, coalesce, partitionBy

    重新分割槽

    val rdd1 = sc.parallelize(1 to 10, 3)
    val rdd2 = rdd1.coalesce(2, false)
    rdd2.partitions.length
    
  17. checkpoint

    sc.setCheckpointDir("hdfs://cdhnocms01:8020/userdata/cp")
    val rdd = sc.textFile("hdfs://cdhnocms01:8020/userdata/wc.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    rdd.checkpoint
    rdd.isCheckpointed
    rdd.count
    rdd.isCheckpointed
    rdd.getCheckpointFile