1. 程式人生 > >《Spark官方文件》Spark操作指南

《Spark官方文件》Spark操作指南

原文連結   譯者:小村長

Spark–Quick Start

本專案是 Apache Spark官方文件的中文翻譯版,致力於打造一個全新的大資料處理平臺來滿足大資料處理和分析的各個使用場景,本次翻譯主要針對對Spark感興趣和致力於從事大資料方法開發的人員提供有價值的中文資料,希望能夠對大家的工作和學習有所幫助。

Spark最近幾年在國內外都比較火,在淘寶、百度、騰訊、高偉達等一些公司有比較成熟的應用,做大資料方面的開發人員或多或少都與其有接觸。Spark的中文資料相對前幾年相對較多,但是我認為官方文件才是最好最完美的學習資料,今天讓小村長為你揭開Spark的神祕面紗,一同走進Spark的精神世界。

本嚮導提供了Spark的簡單介紹和快速入門,我第一次通過Spark shell來介紹Python 和 Scala API的使用,然後向你們展示怎麼通過Java、Python和Scala開發一個Spark應用程式,你可以通過檢視開發嚮導瞭解更多開發細節。

為了更快地學習這篇使用指南,首先你需要下載一個Spark安裝包,由於我們並沒有使用HDFS,所以你可以下載任何相容Hadoop版本的安裝包。

Spark Shell的使用

基礎部分

Spark’s shell 提供了一種學習API的簡單方法, 也是一個強大的資料分析工具.它可以通過Scala(它是一種執行在Java虛擬機器上並且能夠使用Java庫的程式語言 )或者Python來操作. 在Spark 目錄下執行以下指令碼:
./bin/spark-shell (Scala)


./bin/pyspark (Python)
Spark主要的抽象是一個分散式資料集(RDD). RDDs 能夠通過Hadoop的InputFormats (例如HDFS檔案)或者通過其他的RDD轉換生成.現在讓我們用Spark原始碼目錄中README 檔案的文字內容生成一個新的RDD:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDs 有很多actions(我翻譯成操作因子), 通過轉換操作能生成一個新的RDD,讓我們來進行一些actions操作:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

現在讓我們使用transformation(翻譯成轉換因子).我們將使用 filter 操作來返回一個新的RDD資料集.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

我們能夠通過鏈式程式設計把transformations 操作和 actions操作連線一起:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

RDD更多操作

RDD的 actions 和transformations 能夠使用更多複雜的操作. 我們能夠獲取更多操作通過下面連結:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一個map函式統計一行單詞個數,建立一個先的RDD. reduce函式是尋找單詞最多的某一行所含單詞個數 .map和reduce函式裡面的引數稱為函式自變數(閉包),它能夠使用Scala或者Java的任何特性. 例如,我們很輕鬆的在任何地方呼叫和定義函式 . 我們可以使用 Math.max()函式是程式碼看起來更簡潔更加容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一個普通的資料處理流程是MapReduce,這也是Hadoop中很普通的資料處理方式. Spark 繼承MapReduce資料處理流程並是它更加容易:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

現在,我們通過  map函式來進行合併操作 並且通過reduceByKey 轉換函式來統計每個單詞的個數並生成一個RDD鍵值對. 統計單詞的數量可以通過 collect 函式來實行:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

快取

Spark也支援把一個數據集放到一個叢集的快取裡面.當資料多次訪問它很有用,例如當你查詢一個常用的資料集或者執行一個 PageRank演算法的時候. 舉一個簡單的例子, 讓我們把 linesWithSpark 資料集快取起來:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

它可能是不明智的用Spark瀏覽和快取一個100行的資料. 這個部分通常運用在大的資料集上, 當叢集有十個甚至成千上百個節點的時候. 你也能夠通過 bin/spark-shell來連線叢集, 更多的描述請看 programming guide.

獨立應用開發

假如我希望開發一個獨立的應用通過Spark API. 我們可以通過 Scala (with sbt), Java (with Maven), and Python來呼叫開發Spark API.

現在我們建立一個簡單的Spark應用,實際上, 它的名字是 SimpleApp.scala:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

注意一個應用應該定義在一個 main()函式裡面或者通過繼承scala.Appscala.App的子類可能不能正確的工作.

這個程式僅僅是統計README檔案中包含‘a’和‘b’的數量.注意你需要替換你的本地的YOUR_SPARK_HOME 環境變數. 不同於前面的Spark shell例子, 它需要初始化SparkContext, 我們需要初始化 SparkContext作為程式執行的一部分.

我們需要建立一個 SparkConf 物件,它包含了我們應用所包含的資訊.

我們的應用依賴Spark API, 所以我們也包含sbt 配置檔案, simple.sbt, 它包含了Spark的一些依賴. 這個檔案增加一個倉庫,而這些倉庫是Spark執行不可缺少的一部分:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"

為了使 sbt能夠正確的工作, 我們需要佈局SimpleApp.scalasimple.sbt按計劃執行這種型別結構.一旦配置正確, 我們能夠建立一個JAR包包含這個應用的程式碼, 然後使用 spark-submit 來執行我們的程式.

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

下一章

配置執行你的第一個Spark程式!

  • 想深入的瞭解SparkAPI, 可以開啟 Spark programming guide, 或者看“Programming Guides” 選單瞭解其他元件.
  • 最後, Spark包含幾個簡單的例項 (Scala, Java, Python, R). 你可以按照下面的例項執行他們:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R