1. 程式人生 > >第15課:RDD建立內幕徹底解密

第15課:RDD建立內幕徹底解密

內容:

1.RDD建立的幾個方式

2.RDD建立實戰

3.RDD內幕

 

第一個RDD:代表了星火應用程式輸入資料的來源

通過轉型來對RDD進行各種運算元的轉換實現演算法

RDD的3種基本的建立方式

1,使用程式中的集合建立RDD;

2,使用本地檔案系統建立RDD;

3,使用HDS建立RDD

其他:

4,基於DB建立RDD

5,基於NoSQL的,例如HBase的

如圖6所示,基於S3建立RDD

如圖7所示,基於資料流建立RDD

 

1.通過集合建立RDD的實際意義:測試

2.使用本地檔案系統建立RDD的作用:測試大量資料檔案

3.使用HDFS建立RDD:生產環境最常用的RDD建立方式

 

hadoop是基礎設施,spark是計算核心

下面以程式碼演示通過集合建立RDD:

Object RDDBasedOnCollections {

  def main(args:Array[String]) {

val conf = new SparkConf()    //建立SparkConf物件

conf.setAppName(“RDDBasedOnCollections”)  //設定應用程式名稱,在程式執行的監控介面可以看到這個名稱

conf.setMaster(“local”)

val sc = new SparkContext(conf)  //建立SparkContext物件,通過傳入SparkConf例項來定製Spark執行的具體引數和配置資訊。

val numbers = 1 to 100  //建立一個scala集合

val rdd = sc.parallelize(numbers)   //建立一個ParallelCollectionRDD

 

val sum = rdd.reduce(_+_)  //1+2=3  3+3=6  6+4=10 ...

println(“1+2+......+99+100=” + sum)

  }

}

 

你可以在再智慧裝置 例如手機 平板 電視 上使用Spark,也可以在PC和Server使用使用Spark。Spark可以執行在一切裝置上,只要有JVM即可。

如果是單臺機,可以通過多執行緒方式模擬分散式

Local模式 預設情況下如果失敗了 就是失敗了。

下面是SparkContext的createTaskScheduler方法的原始碼:

/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case "yarn-standalone" "yarn-cluster" =>
        if (master == "yarn-standalone") {
          logWarning(
            "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
        }
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
        } catch {
          // TODO: Enumerate the exact reasons why it can fail
          // But irrespective of it, it means we cannot proceed !
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }
        val backend = try {
          val clazz =
            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }
        scheduler.initialize(backend)
        (backend, scheduler)

      case "yarn-client" =>
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        val backend = try {
          val clazz =
            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        scheduler.initialize(backend)
        (backend, scheduler)

      case MESOS_REGEX(mesosUrl) =>
        MesosNativeLibrary.load()
        val scheduler = new TaskSchedulerImpl(sc)
        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
        val backend = if (coarseGrained) {
          new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
        } else {
          new MesosSchedulerBackend(scheduler, sc, mesosUrl)
        }
        scheduler.initialize(backend)
        (backend, scheduler)

      case SIMR_REGEX(simrUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
        scheduler.initialize(backend)
        (backend, scheduler)

      case zkUrl if zkUrl.startsWith("zk://") =>
        logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
          "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
        createTaskScheduler(sc, "mesos://" + zkUrl)

      case _ =>
        throw new SparkException("Could not parse Master URL: '" + master + "'")
    }
  }
}

通過原始碼可以看出如果使用LOCAL_N_FAILURES_REGEX模式,設定執行緒數和最大失敗次數,如果失敗了可以重試。所以Spark作為一個單機版軟體也是非常強悍的。

 

未指定並行度的情況下,spark看叢集有多少core就用多少個Core(並行度)。

spark會最大化使用計算資源,計算效率非常高。但如果管理不當會更耗資源。

前面的物件RDDBasedOnCollections 執行時只有一個stage。原因是

程式碼中只有一個reduce,reduce是Action,不會產生RDD,所以也沒有Shuffle。

hadoop的mr已沒有任何應用場景了。

ParallelCollectionRDD 的原始碼如下:

private object ParallelCollectionRDD {
  /**
   * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
   * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
   * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
   * is an inclusive Range, we use inclusive range for the last slice.
   */
  def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
    if (numSlices < 1) {
      throw new IllegalArgumentException("Positive number of slices required")
    }
    // Sequences need to be sliced at the same set of index positions for operations
    // like RDD.zip() to behave as expected
    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (until numSlices).iterator.map(i => {
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      })
    }
    seq match {
      case r: Range => {
        positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
          // If the range is inclusive, use inclusive range for the last slice
          if (r.isInclusive && index == numSlices - 1) {
            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
          }
          else {
            new Range(r.start + start * r.step, r.start + end * r.step, r.step)
          }
        }).toSeq.asInstanceOf[Seq[Seq[T]]]
      }
      case nr: NumericRange[_] => {
        // For ranges of Long, Double, BigInteger, etc
        val slices = new ArrayBuffer[Seq[T]](numSlices)
        var r = nr
        for ((start, end) <- positions(nr.length, numSlices)) {
          val sliceSize = end - start
          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
          r = r.drop(sliceSize)
        }
        slices
      }
      case _ => {
        val array = seq.toArray // To prevent O(n^2) operations for List etc
        positions(array.length, numSlices).map({
          case (start, end) =>
            array.slice(start, end).toSeq
        }).toSeq
      }
    }
  }
}

   可以看出ParallelCollectionRDD可以有兩處引數,seq: Seq[T], numSlices: IntnumSlices如果不指定將會預設利用所有CPU,獲得最高並行度。如果指定numSlices將會按指定的分片(並行度)執行Spark程式

實際上Spark的並行度到底應該設定為多少呢?

最佳實踐:spark並行度:每個core可以承載2-4個partition,

例如:32個core的話可以設為64-128

跟資料規模沒有關係,只跟每個Task計算partition時使用的記憶體使用量和cpu使用時間有關。

blockmanager管理資料的優先位置,在程式啟動時就完成了這個過程。SparkContext在構建DAGScheduler對DAG進行Stage劃分時已經決定好了每一個數據分片的優先位置。

無論資料是放在記憶體還是磁碟還是Tachyon上,都由BlockManager管理。

下面再看一下ParallelCollectionPartition的原始碼:

private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil) {
  // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
  // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
  // instead.
  // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.

  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray

//讀取資料時呼叫ParallelCollectionRDD.slice並轉換為陣列。
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray

//對陣列分片,將每一片資料變成ParallelCollectionPartition
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
  }

  override def getPreferredLocations(s: Partition): Seq[String] = {
    locationPrefs.getOrElse(s.index, Nil)

//獲取資料的優先位置。
  }
}

 

下面通過讀取本地檔案建立RDD:

val rdd = sc.textFile(“D://README.txt”)  //注意是雙斜槓

//計算所有行的長度的總和

val lineLength = rdd.map(line => line.length)

val sum = lineLength.reduce(_+_)

println(“The total character of the file is ” +  sum)

 

下面看一下textFile的原始碼:

/**
 * Read a text file from HDFS, a local file system (available on all nodes), or any
 * Hadoop-supported file system URI, and return it as an RDD of Strings.
 */
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString)
}

可以看出在textFile是讀取HDFS或本地檔案系統或其他hadoop支援的檔案系統上的檔案,並將其轉換為RDD。在textFile內部呼叫了hadoopFile函式。

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
 *
 * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
 * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
 * operation will create many references to the same object.
 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
 * copy them using a `map` function.
 */
def hadoopFile[KV](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[KV]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(KV)] = withScope {
  assertNotStopped()
  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
  val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
  val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
}

   可以看出在hadoopFile內建立了一個HadoopRDD,HadoopRDD的建立要依賴於Hadoop底層本身。

def hadoopRDD[KV](
    conf: JobConf,
    inputFormatClass: Class[_ <: InputFormat[KV]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(KV)] = withScope {
  assertNotStopped()
  // Add necessary security credentials to the JobConf before broadcasting it.
  SparkHadoopUtil.get.addCredentials(conf)
  new HadoopRDD(this,conf,inputFormatClass,keyClass,valueClass,minPartitions)
}

 

FileInputFormat是java的寫的,是org.apache.hadoop。mapred 的包。所以這裡是用星火操作的Hadoop的實現。

有人說星火的缺點是沒有檔案系統,但其實這正是星火的優點,正因為沒有檔案系統,所以才可以跨一切檔案系統。

 

用HBase的/ MySQL的/ ORACLE的話要考慮資料本地性,要認真寫getPreferredLacation.getPreferredLacation決定計算髮生在什麼地方.DAGScheduler在對DAG劃分不同階段時,階段內部具體任務已經決定了資料優先位置。所以MySQL的/預言資料庫機上要安裝Spark.HBase節點上也要安裝的火花。

實際生產環境下,HBase的和火花安裝在同一節點上是可能的,但MySQL的/ oracle的節點上安裝火花的可能性較小,這時就需要用的Tachyon作為中介軟體,匯入資料庫的資料,也可以把資料庫中的資料匯入配置單元中,在蜂巢節點上執行的火花。