1. 程式人生 > >第二天 -- Spark叢集啟動流程 -- 任務提交流程 -- RDD依賴關係 -- RDD快取 -- 兩個案例

第二天 -- Spark叢集啟動流程 -- 任務提交流程 -- RDD依賴關係 -- RDD快取 -- 兩個案例

第二天 – Spark叢集啟動流程 – 任務提交流程 – RDD依賴關係 – RDD快取 – 兩個案例

文章目錄

一、Spark叢集啟動流程

  1. 呼叫start-all.sh指令碼,啟動Master服務,首先執行preStart,檢查超時的Worker
  2. 開始執行receive方法,不斷接受其它Actor向它傳送過來的請求
  3. 在呼叫start-all.sh指令碼的時候,會解析slaves配置檔案,決定了在哪幾個節點上啟動Worker服務,Worker服務在啟動的時候,會啟動preStart方法,該方法會向Master進行註冊
  4. Master收到Worker的註冊資訊後,開始持久化註冊資訊,並響應給Worker
  5. Worker收到Master傳送過來的響應資訊(MasterUrl),
  6. Worker開始向Master傳送心跳資訊

1542711318434

二、Spark任務提交流程:

  1. Driver端向Master端註冊任務
  2. Master收到Driver端傳送過來的資訊後,把資訊封裝為真正的任務資訊並把任務資訊進行儲存
  3. Master通知Worker拿取任務資訊並啟動Executor
  4. Worker向Master拉取任務資訊的同時啟動Executor
  5. Executor開始向Driver進行註冊
  6. Driver開始把任務傳送給相應的Executor

1542711346279

三、RDD的依賴關係

​ RDD和它依賴的父RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

1542711450396

窄依賴

​ 窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用(一對一、多對一)

寬依賴

​ 寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition(一對多、多對多)

Lineage

​ RDD只支援粗粒度轉換,即在大量記錄上執行的單個操作。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分割槽。RDD的Lineage會記錄RDD的元資料資訊和轉換行為,當該RDD的部分分割槽資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的資料分割槽。

四、RDD的快取

​ Spark速度非常快的原因之一,就是在不同操作中可以在記憶體中持久化或快取多個數據集。當持久化某個RDD後,每一個節點都將把計算的分片結果儲存在記憶體中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得後續的動作變得更加迅速。RDD相關的持久化和快取,是Spark最重要的特徵之一。可以說,快取是Spark構建迭代式演算法和快速互動式查詢的關鍵。

RDD快取方式、級別

​ RDD通過persist方法或cache方法可以將前面的計算結果快取,但是並不是這兩個方法被呼叫時立即快取,而是觸發後面的action時,該RDD將會被快取在計算節點的記憶體中,並供後面重用。

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def cache(): this.type = persist()

​ 通過檢視原始碼發現cache最終也是呼叫了persist方法,預設的儲存級別都是僅在記憶體儲存一份,Spark的儲存級別分為很多種,儲存級別在object StorageLevel中定義的。

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)

class StorageLevel的主構造器引數如下:

	private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)

​ 快取有可能丟失,或者儲存於記憶體的資料由於記憶體不足而被刪除,RDD的快取容錯機制保證了即使快取丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的資料會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。

五、案例一:基站訊號範圍

​ 需求:求使用者在一定的時間範圍內停留的時間最長的top2的基站範圍

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 需求:求使用者在一定的時間範圍內停留的時間最長的top2的基站範圍
  * 思路:
  * 1.求出在相同基站停留的總時長
  * 2.把基站的經緯度join過來
  * 3.按使用者分組,組內取top2
  */
object MobileLocation {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 獲取使用者日誌資料檔案
    val files: RDD[String] = sc.textFile("F:/scaladata/data/lacduration/log")

    // 將使用者日誌資料進行劃分
    val splitedLogs: RDD[((String, String), Long)] = files.map(line => {
      val fields = line.split(",")
      val phone = fields(0) // 手機號
      val time = fields(1).toLong // 時間戳
      val lac = fields(2) // 基站id
      val eventType = fields(3).toInt // 事件型別
      val time_long = if (eventType == 1) -time else time

      ((phone,lac), time_long)
    }).cache()

    // 使用者在相同的基站停留的總時長
    val totalTimeLogs: RDD[((String, String), Long)] = splitedLogs.reduceByKey(_+_)

    // 為了便於和基站資訊進行join,需要把資料進行重新整合
    val lacAndPhoneAndTime: RDD[(String, (String, Long))] = totalTimeLogs.map(line => {
      val phone = line._1._1
      val lac = line._1._2
      val time = line._2 // 停留的總時長
      (lac, (phone, time))
    })


    // 獲取基站基礎資訊
    val lacInfo = sc.textFile("F:/scaladata/data/lacduration/lac_info.txt")
      .map(line => {
        val fields = line.split(",")
        val lac = fields(0) // 基站id
        val x = fields(1) // 經度
        val y = fields(2) // 維度
        (lac,(x,y))
      })
    
    // 把經緯度資訊join到使用者訪問資訊中
    val joinedLogs: RDD[(String, ((String, Long), (String, String)))] = lacAndPhoneAndTime.join(lacInfo)

    // 為了方便將使用者進行分組,把資料進行重新整合
    val phoneAndTimeAndXY: RDD[(String, Long, (String, String))] = joinedLogs.map(x => {
      val phone = x._2._1._1
      val lac = x._1
      val time = x._2._1._2
      val xy = x._2._2
      (phone, time, xy)
    })

    // 按照使用者進行分組
    val grouped: RDD[(String, Iterable[(String, Long, (String, String))])] = phoneAndTimeAndXY.groupBy(_._1)

    // 按照使用者訪問基站的總時長進行降序排列
    val res: RDD[(String, List[(String, Long, (String, String))])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))

    res.foreach(f => println(f))
    sc.stop()
  }
}

兩個案例的資料檔案下載:點選下載

六、案例二:學科模組網站訪問排名

​ 需求:求每個學科各個模組訪問量後取topN

普通實現
import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 需求:求每個學科各個模組訪問量後取topN
  * 思路:
  *   1.每個學科各個模組的訪問量
  *   2.以學科進行分組並在組內排序取topN
  */
object SubjectCount_1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 獲取資料
    val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
      val fields = line.split("\t")
      val url = fields(1) // 使用者請求的url
      (url,1)
    })

    // 按照url進行聚合,得到每個學科各個模組的訪問量
    val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)

    // 獲取學科資訊並返回所有資料
    val subjectAndUrlAndCount: RDD[(String, String, Int)] = sumedLogInfo.map(tup => {
      val url = tup._1
      val count = tup._2
      val subject = new URL(url).getHost

      (subject, url, count)
    })

    // 按照學科資訊進行分組
    val groupedLogInfo: RDD[(String, Iterable[(String, String, Int)])] = subjectAndUrlAndCount.groupBy(_._1)

    // 在學科資訊組內進行降序排序並取top3
    val res: RDD[(String, List[(String, String, Int)])] = groupedLogInfo.mapValues(_.toList.sortBy(_._3).reverse.take(3))

    res.foreach(f => println(f))

    sc.stop()
  }
}
使用快取
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 需求:求每個學科各個模組訪問量後取topN
  * 快取應用
  */
object SubjectCount_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 模擬從mysql中獲取的學科資訊
    val subjects = Array("http://java.learn.com", "http://ui.learn.com", "http://bigdata.learn.com", "http://android.learn.com", "http://h5.learn.com")

    // 獲取資料
    val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
      val fields = line.split("\t")
      val url = fields(1) // 使用者請求的url
      (url,1)
    })

    // 按照url進行聚合,得到每個學科各個模組的訪問量
    val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)
    for(subject <- subjects){
      // 過濾出屬於該學科的各個模組對應的訪問量
      val filteredSubjectInfo: RDD[(String, Int)] = sumedLogInfo.filter(_._1.startsWith(subject))
      // 開始降序排序並取top3
      val res: Array[(String, Int)] = filteredSubjectInfo.sortBy(_._2,false).take(3)
      res.foreach(f => println(f))
    }

    sc.stop()
  }
}
自定義分割槽實現

​ 能解決部分資料傾斜問題

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
  * 需求:求每個學科各個模組訪問量後取topN
  * 實現自定義分割槽器,按照學科把不同的學科資訊放到不同的分割槽中
  */
object SubjectCount_3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 獲取資料
    val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
      val fields = line.split("\t")
      val url = fields(1) // 使用者請求的url
      (url,1)
    })

    // 按照url進行聚合,得到每個學科各個模組的訪問量
    val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)

    // 獲取學科資訊並返回所有資料
    val subjectAndUrlAndCount: RDD[(String, (String, Int))] = sumedLogInfo.map(tup => {
      val url = tup._1
      val count = tup._2
      val subject = new URL(url).getHost

      (subject, (url, count))
    }).cache()

    // 呼叫預設的分割槽器進行分割槽,雜湊碰撞導致會出現資料傾斜問題,此時需要自定義分割槽
      //  val partitioned = subjectAndUrlAndCount.partitionBy(new HashPartitioner(3))
     //   partitioned.saveAsTextFile("f:/sparkdata/out-20181120-1")

    // 獲取所有的學科資訊,需要去重
    val subjects: Array[String] = subjectAndUrlAndCount.keys.distinct.collect

    // 呼叫自定義分割槽器
    val partitioner = new SubjectPartitioner(subjects)
    val partitioned = subjectAndUrlAndCount.partitionBy(partitioner)
    val res: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
      it.toList.sortBy(_._2._2).reverse.take(3).iterator
    })
    res.saveAsTextFile("f:/sparkdata/out-20181120-2")

    sc.stop()
  }
}

/**
  * 自定義分割槽器
  */
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
  // 用於儲存學科資訊和對應的分割槽號
  val subjectAndNum = new mutable.HashMap[String,Int]()

  // 計數器,用於指定分割槽號
  var i = 0
  for(subject <- subjects){
    subjectAndNum += (subject -> i)
    i += 1
  }

  // 獲取分割槽數
  override def numPartitions: Int = subjects.size
  // 獲取分割槽號
  override def getPartition(key: Any): Int = subjectAndNum.getOrElse(key.toString, 0)
}