1. 程式人生 > >Spark學習總結(一)

Spark學習總結(一)

RDD及其特點

1、RDD是Spark的核心資料模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分散式資料集。

2、RDD在抽象上來說是一種元素集合,包含了資料。它是被分割槽的,分為多個分割槽,每個分割槽分佈在叢集中的不同節點上,從而讓RDD中的資料可以被並行操作。(分散式資料集)

3、RDD通常通過Hadoop上的檔案,即HDFS檔案或者Hive表,來進行建立;有時也可以通過應用程式中的集合來建立。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致資料丟了,那麼RDD會自動通過自己的資料來源重新計算該partition。這一切對使用者是透明的。

5、RDD的資料預設情況下存放在記憶體中的,但是在記憶體資源不足時,Spark會自動將RDD資料寫入磁碟。(彈性)

建立RDD

進行Spark核心程式設計的第一步就是建立一個初始的RDD。該RDD,通常就代表和包含了Spark應用程式的輸入源資料。然後通過Spark Core提供的transformation運算元,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種建立RDD的方式:

1.使用程式中的集合建立RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地檔案建立RDD(主要用於臨時性處理有大量資料的檔案)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();

3.使用HDFS檔案建立RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS檔案建立RDD對比使用本地檔案建立RDD,需要修改的,只有兩個地方:
第一,將SparkSession物件的master("local")方法去掉
第二,我們針對的不是本地檔案了,修改為hadoop hdfs上的真正的儲存大資料的檔案

操作RDD

Spark支援兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD建立一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程式的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函式,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函式進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函式進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,儲存到檔案等,並可以返回結果給Driver程式。action操作執行,會觸發一個spark job的執行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素儲存到檔案中,對每個元素呼叫toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要呼叫其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接快取在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等建立了一個RDD之後,直接連續呼叫cache()或persist()才可以。

如果你先建立一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的檔案會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和記憶體消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以快取所有資料的話,那麼就使用這種策略。因為純記憶體速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法儲存所有資料的話,那麼使用MEMORY_ONLY_SER,將資料進行序列化進行儲存,純記憶體操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶字尾為_2的策略,進行資料的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁碟讀取資料,還不如重新計算一次。

共享變數

Spark提供了兩種共享變數:Broadcast Variable(廣播變數)和Accumulator(累加變數)。

BroadcastVariable會將使用到的變數,僅僅為每個節點拷貝一份,更大的用處是優化效能,減少網路傳輸以及記憶體消耗。廣播變數是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value)  //廣播變數讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變數,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程式可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文字檔案內的每個單詞都統計出其出現的次數。
2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.建立RDD
  • 2.將文字進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.列印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 建立lines RDD
        JavaRDD<String> lines = sc.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt");
        // 將文字分割成單詞RDD
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        //將單詞RDD轉換為(單詞,1)鍵值對RDD
        JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
            @Override
            public Tuple2<String,Integer> call(String s) throws Exception {
                return new Tuple2<String,Integer>(s,1);
            }
        });
       //對wordPair 進行按鍵計數
        JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer +integer2;
            }
        });
        // 到這裡為止,就得到了每個單詞出現的次數
        // 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
        // wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
        // 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

        // 進行key-value的反轉對映
        JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
                return new Tuple2<Integer, String>(s._2,s._1);
            }
        });
        // 按照key進行排序
        JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
        // 再次將value-key進行反轉對映
        JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
                return new Tuple2<String, Integer>(s._2,s._1);
            }
        });
        // 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
        // 打印出來
        sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> s) throws Exception {
                System.out.println("word \""+s._1+"\" appears "+ s._2+" times.");
            }
        });
        sc.close();
    }
}

Java版本jdk1.8

可以使用lambda表示式,簡化程式碼:

public class SortWordCount {
    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 建立lines RDD
        JavaRDD<String> lines = sc.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt");
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
        JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
        JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
        JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
        JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
        sortedWordCount.foreach(s->System.out.println("word \""+s._1+"\" appears "+ s._2+" times."));
        sc.close();
    }
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
    val lines = spark.sparkContext.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt")
    val words = lines.flatMap{line => line.split(" ")}
    val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
    val countWord = wordCounts.map{word =>(word._2,word._1)}
    val sortedCountWord = countWord.sortByKey(false)
    val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
    sortedWordCount.foreach(s=>
    {
      println("word \""+s._1+ "\" appears "+s._2+" times.")
    })
    spark.stop()
  }
}

小案例實戰2

需求:

1、按照檔案中的第一列排序。
2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered介面和Serializable介面,在key中實現自己對多個列的排序演算法
  • 2、將包含文字的RDD,對映成key為自定義key,value為文字的JavaPairRDD(map)
  • 3、使用sortByKey運算元按照自定義的key進行排序(sortByKey)
  • 4、再次對映,剔除自定義的key,只保留文字行(map)
  • 5、列印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
  override def compare(that: SecondSortKey): Int = {
    if(this.first - that.first !=0){
      this.first-that.first
    }else{
      this.second-that.second
    }
  }
}
object SecondSort {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
    val lines = spark.sparkContext.textFile("D:\\sort.txt")
    val pairs = lines.map{line => (
      new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
    )}
    val sortedParis = pairs.sortByKey()
    val sortedLines = sortedParis.map(pairs => pairs._2)
    sortedLines.foreach(s => println(s))
    spark.stop()
  }
}

小案例實戰3

需求:

對每個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.建立初始RDD

2.對初始RDD的文字行按空格分割,對映為key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將陣列緩衝按從大到小排序
  • 4.4 對排序後的陣列緩衝取其前三

5.列印輸出

以下是使用scala實現:

object GroupTop3 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
    //建立初始RDD
    val lines = spark.sparkContext.textFile("D:\\score.txt")
    //對初始RDD的文字行按空格分割,對映為key-value鍵值對
    val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
    //對pairs鍵值對按鍵分組
    val groupedPairs = pairs.groupByKey()
    //獲取分組後每組前3的成績
    val top3Score = groupedPairs.map(classScores => {
      var className = classScores._1
      //獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三
      var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
      Tuple2(className,top3)
    })
    top3Score.foreach(m => {
      println(m._1)
      for(s <- m._2) println(s)
      println("------------------")
    })
  }
}

以上三個小案例都用Scala實現了,用到了Scala中的集合的操作、高階函式、鏈式呼叫、隱式轉換等知識,自己動手實現,對Scala有個比較好的理解和掌握。

作者:簡單的happy Python愛好者社群專欄作者

相關推薦

Spark學習總結

RDD及其特點1、RDD是Spark的核心資料模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分散式資料集。2、RDD在抽象上來說是一種元素集合,包含了資料。它是被分割槽的,分為多個分割槽,每個分割槽分佈在叢集中的不同節點上,從而

JSON必知必會學習總結

tor lint 沒有 script app 對數 數據交換格式 object 什麽 七月第一周,從學校畢業回來上班的第一周。離開一段時候後,再次回來重新工作,有了很多新的認識,不再是實習時那麽混混沌沌了。每天我自己該做什麽,怎麽做,做到什麽程度更清晰了。除了要去完成我負責

PHP學習總結

encode 什麽 code 解決 new span att 面向對象 反斜杠 》PHP 面向對象 使用parent訪問父類的構造方法(__construct)時一定要為當前類繼承要訪問的構造方法 類的構造方法(__construct)在實例化時直接被加載,靜態方法

Java IO學習總結

file flush writer directory 創建 str java 資源 tab 一、File 類 Java中不管文件還是目錄都可以使用File類操作,File能新建、刪除、重命名文件和目錄,但是不能訪問文件內容本身,訪問文件內容需要使用輸入輸出流。 Fi

JSP學習總結

展示 cat 一起 time server 資源 type div simple 一、JSP的介紹與項目部署 JSP,全稱 Java server page是由Sun公司開發的一種動態生成網頁資源的技術。JSP技術使用java語言作為腳本語言,可以將html元素和java代

JAVA學習總結

產生 java虛擬機 右鍵 分布式系 jdk 問題 功能 地址 輸出 一、Java是什麽? Java 是由 Sun Microsystems 在 1995 年首先發布的編程語言和計算平臺。Java 是一項用於開發應用程序的技術,可以讓 Web 變得更有意思和更實用。有許多應

Linux學習總結 windos環境vmware安裝centos7

color 學習總結 oot 遠程訪問服務 分享圖片 watermark 需要 mage 磁盤大小 1.在這裏我先簡單介紹下虛擬化技術,就是我們通過軟件虛擬化出來一個硬件環境,然後就可以在系統裏面安裝子系統,以來我們避免了裝linux,windos雙系統的麻煩,二來我們可以

webservice學習總結-- WebService相關概念介紹

IT strong 資源 fire 求和 log AC service服務 為什麽 一、WebService是什麽? 基於Web的服務:服務器端整出一些資源讓客戶端應用訪問(獲取數據) 一個跨語言、跨平臺的規範(抽象) 多個跨平臺、跨語言的應用間通信整合的方案(實際)

【Java】 Spring 框架初步學習總結簡單實現 IoC 和 AOP

1.0 其中 表示 只需要 第一篇 否則 info fin pojo   Spring 是一個開源的設計層面的輕量級框架,Spring 的好處網上有太多,這裏就不在贅述。   IoC 控制反轉和 AOP 面向切面編程是 Spring 的兩個重要特性。   IoC(Inver

python學習總結,第一個python程序的編寫

程序代碼 spa 編碼 結構 編輯 下載地址 utf 修改編碼 general 1. python是一種解釋性,編譯型,互動型的語言,面向對象,動態數據類型的高級程序設計語言。 1.1 交互式:意味著可以在一個提示符直接交互執行你寫的程序。 1.2 解釋型:意味著開發過程中

MyBatis學習總結——ORM概要與MyBatis快速起步

管理 stat prim aot 驅動 單元測試 build sta 環境 目錄 一、ORM 1.1、ORM簡介 1.2、ORM的概念 1.3、ORM的優缺點 二、MyBatis 2.1、MyBatis的特點 2.2、MyBatis工作流程

[學習總結] python語言學習總結

用py也很久了,很多東西只知道拿來用,並沒有深究,感覺這樣是不夠的。 我決定寫這麼一篇總結,把很多遺忘的東西拾起來,把很多沒搞懂的東西搞清楚。 1.eval函式 用法:eval(expression, globals=None, locals=None) 解釋:將字串str當成有效的表

Dubbo的學習總結——遠端呼叫

在學習用dubbo進行分散式遠端呼叫之前,先來看一個簡單的有關訪問網路上的天氣預報介面的遠端呼叫例項。 1.先建立一個java專案,建立過程這裡不詳細講述,只附上一張截圖供入門級選手參考: 2.建立好一個java專案後,接下來就是建立一個Weather實體類(這裡可以使用lombok的@Da

Http學習總結

http使用面向連線的TCP作為傳輸層協議。http本身無連線。 請求報文 CRLF是回車換行   方法為GET的請求報文     方法為POST的請求報文   &n

多執行緒學習總結

一、程序和執行緒的定義 程序:程序是資源(CPU、記憶體等)分配的基本單位,它是程式執行時的一個例項。程式執行時系統就會建立一個程序,併為它分配資源,然後把該程序放入程序就緒佇列,程序排程器選中它的時候就會為它分配CPU時間,程式開始真正執行。 執行緒:執行緒是程式執行時的最小單位,它是程序

JavaWeb學習總結---httpservletrequest物件

javaweb學習總結(十)——HttpServletRequest物件(一) 一、HttpServletRequest介紹   HttpServletRequest物件代表客戶端的請求,當客戶端通過HTTP協議訪問伺服器時,HTTP請求頭中的所有資訊都封裝在這個物件中,通過這個物件提供的方

java執行緒學習總結

(宣告:並非原創,只是一個簡單總結) 一、執行緒和程序的概念:            程序:程序是處於執行過程中的程式,並且具有一定的對功能,是系統進行資源分配和排程的一個獨立單位。      

Spark學習記錄Spark 環境搭建以及worldCount示例

安裝Spark ------------------- 首先,安裝spark之前需要先安裝scala,並且安裝scala的版本一定要是將要安裝的spark要求的版本。比如spark2.1.0 要求scala 2.11系列的版本,不能多也不能少 1.下載spark-2.1.0-bin-hadoop

Git學習總結git的安裝和配置

git的安裝: 在這裡,因為網上有了很多詳盡的教程,我就不贅述了。git安裝教程 然後你需要申請一個GitHub賬號:點選申請GitHub賬號 申請GitHub賬號的時候要繫結一個郵箱,繫結QQ郵箱即可。 下面將你電腦上的git繫結GitHub賬號: 開啟Git

Spring Boot 學習總結 ---入口類和@SpringBootApplication

入口類和@SpringBootApplication SpringBoot通常有一個名為*Application的入口類,入口類裡有一個main方法,這個main方法是一個標準的java應用的入口方法。在main方法中使用SpringApplication.run(*App