對部分內容有修改,恕本人水平有限,如有錯誤,在所難免。

PySpark程式設計指南(譯):

1.  概述:

a)  從高層次上來看,每一個Spark應用都包含一個驅動程式,用於執行使用者的main函式以及在叢集上執行各種並行操作。Spark提供的主要抽象是彈性分散式資料集(RDD),這是一個包含諸多被劃分到不同節點上進行並行處理的元素集合。RDD通過開啟HDFS(或其他hadoop支援的檔案系統)上的一個檔案或轉換一個已經存在於驅動程式中的Scala集合(已存在的RDD)來得到。使用者可以要求Spark將RDD持久化到記憶體中,這樣就可以在並行操作中高效複用。另外,在節點發生錯誤時RDD可以自動恢復。

b)  Spark提供的另一個抽象是可以在並行操作中使用的共享變數。在預設情況下,當Spark以任務集合(分佈於不同節點)的方式並行執行一個函式時,對於所有在函式中使用的變數,每一個任務都會得到一個副本。有時,某一個變數需要在任務間或任務與驅動程式之間共享。Spark支援兩種共享變數:broadcast variables,將一個值快取到所有節點的記憶體中;accumulators,只能用於累加的變數,比如用於計數和求和。

2.  Spark連結

a)  Spark1.6.1支援Python2.6或更高的版本(支援Python3.4+)。它使用了標準的CPython直譯器,所以諸如NumPy一類的C庫也是可以使用的,PyPy2.3+也是適用的。

b)  通過Spark目錄下的bin/spark-submit指令碼你可以在Python下執行Spark應用。這個指令碼會載入Spark的Java/Scala庫然後讓你將應用提交到叢集中。你可以執行bin/pyspark來開啟Python的互動命令列。

c)  如果你希望訪問HDFS上的資料,你需要為你使用的HDFS版本建立一個PySpark連結。Prebuilt Package(即spark-1.6.0-bin-without-hadoop.tgz)在Spark主頁已經可以找到,適合於常見的HDFS版本。

d)  最後,你需要將一些Spark的類import到你的程式中。加入如下這行:

from pyspark import SparkContext,SparkConf

e)  PySpark需要driver與workers之間具有相同的Python版本,使用了PATH路徑下的預設版本。你可以明確你想使用的Python版本通過使用PYSPARK_PYTHON:

e.g.

$PYSPARK_PYTHON=python3.4 bin/pyspark
$PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submitexamples/src/main/python/pi.py

3.  Spark初始化

Spark程式中要做的第一件事就是建立一個SparkContext物件來告訴Spark如何連線到叢集。為了建立SparkContext,你首先需要建立一個SparkConf物件,這個物件會包含你的應用的一些相關資訊。

conf =SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName引數是在叢集UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN叢集的URL,如果你在本地執行,那麼這個引數應該是特殊的”local”字串。在實際使用中,當你在叢集中執行你的程式,你一般不會把master引數寫死在程式碼中,而是通過用spark-submit執行程式來獲得這個引數。不過,在本地測試以及單元測試時,你仍然可以通過傳入”local”來執行Spark程式。

4.  使用Shell

a)  在PySpark命令列中,一個特殊的整合在直譯器裡的SparkContext變數已經建立好了,變數名叫做sc。建立你自己的SparkContext將會失效。你可以通過使用--master命令列引數來設定與SparkContext連線的master主機,你也可以通--py-files引數傳遞一個用逗號隔開的列表將Python的.zip、.egg或.py檔案新增到執行時路徑中。你還可以通過—package引數傳遞一個用逗號隔開的maven列表來給這個Shell會話新增依賴(如Spark的包)。任何可能存在依賴的倉儲(比如SonaType)都可以通過傳給—repositories引數來新增進去。Spark包的所有Python依賴(列在這個包的requirements.txt檔案中)在必要時都必須通過pip手動安裝。

比如,使用四核來執行bin/pyspark應當輸入這個命令:

$./bin/pyspark --master local[4]

又比如,把code.py檔案新增到搜尋路徑中(為了能夠在程式中import code),應當使用這條命令:

$./bin/pyspark --master local[4] --py-files code.py

想要了解命令列選項的完整資訊請執行pyspark --help命令。在這些場景下,pyspark會觸發一個更通用的spark-submit指令碼

在IPython這個加強的Python直譯器中執行PySpark也是可行的。PySpark可以在1.0.0或更高版本的IPython上執行。為了使用IPython,必須在執行bin/pyspark時將PYSPARK_DRIVER_PYTHON變數設定為ipython,就像這樣:

$PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

你還可以通過設定PYSPARK_DRIVER_PYTHON_OPTS來自定義ipython。比如,在執行IPython Notebook時開啟PyLab圖形支援應該使用這條命令:

$ PYSPARK_DRIVER_PYTHON=ipythonPYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab inline" ./bin/pyspark

在IPython NoteBook 伺服器被啟動後,你可以通過”Files”欄建立一個新的”Python 2”notebook。在你試圖通過IPython notebook來嘗試Spark之前,你可以通過在notebook中,鍵入命令

%pylabinlines

來擴充你的notebook。

5.  彈性分散式資料集(RDDs)

Spark是以RDD概念為中心執行的。RDD是一個容錯的、可以被並行操作的元素集合。建立一個RDD有兩個方法:在你的驅動程式中並行化一個已經存在的集合;從外部儲存系統中引用一個數據集。這個儲存系統可以是一個共享檔案系統,比如HDFS、HBase或任意提供了Hadoop輸入格式的資料來源。

6.  並行化集合

並行化集合是通過在驅動程式中一個現有的迭代器或集合上呼叫SparkContext的parallelize方法建立的。為了建立一個能夠並行操作的分佈資料集,集合中的元素都會被拷貝。舉例,以下語句建立了一個包含1到5的並行化集合:

data = [1, 2, 3, 4,5]
distData= sc.parallelize(data)

分散式資料集(distData)被建立後,就可以進行並行操作了。比如,我們可以呼叫disData.reduce(lambda a, b: a+b)來對元素進行疊加。在後文中我們會描述分佈資料集上支援的操作。

並行集合的一個重要引數是將資料集劃分成分片的數量。對每一個分片,Spark會在叢集中執行一個對應的任務。典型情況下,叢集中的每一個CPU將對應執行2-4個分片。一般情況下,Spark會根據當前叢集的情況自行設定分片數量。但是,你也可以通過將數目以第二個引數形式,傳遞給parallelize方法(比如sc.parallelize(data, 10))來手動確定分片數量。注意:有些程式碼中會使用切片(slice,分片的同義詞)這個術語來保持向下相容性。

7.  外部資料集

a)  PySpark可以通過Hadoop支援的外部資料來源(包括本地檔案系統、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分佈資料集。Spark支援文字檔案、序列檔案以及其他任何Hadoop輸入格式檔案。

b)  通過文字檔案建立RDD可以使用SparkContexttextFile方法。這個方法會使用一個檔案的URI(或本地檔案路徑,hdfs://、s3n://這樣的URI等等)然後以文字行的集合的形式讀入。e.g.檔案調取示例:

>>>distFile = sc.textFile("data.txt")

建立完成後distFile上就可以呼叫資料集操作了。比如,我們可以呼叫mapreduce操作來迭加所有文字行的長度,程式碼如下:

distFile.map(lambdas: len(s)).reduce(lambda a, b: a + b)

c)  在Spark中讀入檔案時有幾點要注意:

         i.     如果使用了本地檔案路徑時,要保證在worker節點上這個檔案也能夠通過這個路徑訪問。將這個檔案拷貝到所有worker上或者使用網路掛載的共享檔案系統來解決此問題。

       ii.     包括textFile在內的所有基於檔案的Spark讀入方法,都支援將資料夾、壓縮檔案、包含萬用字元的路徑作為引數。比如,以下程式碼都是合法的:

textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")

      iii.     textFile方法也可以傳入第二個可選引數來控制檔案的分片數量。預設情況下,Spark會為檔案的每一個塊(在HDFS中塊的大小預設是64MB)建立一個分片。但是你      也可以通過傳入一個更大的值來要求Spark建立更多的分片。注意,分片的數量絕不能小於檔案塊的數量。

d)  除了文字檔案之外,Spark的Python API還支援多種其他資料格式:

         i.     SparkContext.wholeTextFiles能夠讀入包含多個小文字檔案的目錄,然後為每一個檔案返回一個(檔名,內容)對。這與textFile方法為每一個文字行返回一條記錄相對應。

       ii.     RDD.saveAsPickleFileSparkContext.pickleFile支援將RDD以簡單的序列化的Python物件格式儲存起來。序列化的過程中會以預設10個一批的數量批量處理。

      iii.     序列檔案和其他Hadoop輸入輸出格式。

e)  注意:

這個特性目前仍處於試驗階段,被標記為Experimental,目前只適用於高階使用者。這個特性在未來可能會被基於Spark SQL的讀寫支援所取代,因為Spark SQL是更好的方式。

f)  可寫型別支援

PySpark序列檔案支援利用Java載入一個鍵值對RDD,將可寫型別轉化成Java的基本型別,然後使用Pyrolite將Java結果物件序列化。當將一個鍵值對RDD儲存到一個序列檔案中時PySpark將會執行上述過程的相反過程。首先將Python物件反序列化成Java物件,然後轉化成可寫型別。以下可寫型別會自動轉換:

可寫型別

Python 型別

Text

Unicode str

IntWritable

Int

FloatWritable

Float

DoubleWritable

Float

BooleanWritable

Bool

BytesWritable

Bytearray

NullWritable

None

MapWritable

Dict

陣列是不能自動轉換的。使用者需要在讀寫時指定ArrayWritable的子型別.在讀入的時候,預設的轉換器會把自定義的ArrayWritable子型別轉化成Java的Object[],之後序列化成Python的元組。為了獲得Python的array.array型別來使用主要型別的陣列,使用者需要自行指定轉換器。

g)  儲存和讀取序列檔案

         i.     和文字檔案類似,序列檔案可以通過指定路徑來儲存與讀取。鍵值型別都可以自行指定,但是對於標準可寫型別可以不指定。

>>>rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>>rdd.saveAsSequenceFile("path/to/file")
>>>sorted(sc.sequenceFile("path/to/file").collect())
[(1,u'a'), (2, u'aa'), (3, u'aaa')]

h)  儲存和讀取其他Hadoop輸入輸出格式

         i.     PySpark同樣支援寫入和讀出其他Hadoop輸入輸出格式,包括’new’和’old’兩種HadoopMapReduce API。如果有必要,一個Hadoop配置可以使用Python字典的形式傳入。以下是一個例子,使用了ElasticsearchESInputFormat

$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar./bin/pyspark
>>> conf = {"es.resource" :"index/type"}   # assumeElasticsearch is running on localhost defaults
>>> rdd =sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
   "org.apache.hadoop.io.NullWritable","org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that isconverted to a Python dict(u'Elasticsearch ID',
 {u'field1': True,u'field2':u'Some Text',u'field3': 12345})

       ii.     注意,如果一個輸入格式僅僅依賴於一個Hadoop配置和/或輸入路徑,並且鍵值型別都可以簡單地根據前面的表格直接轉換,那麼剛才提到的這種方法將非常適用於此輸入格式。

      iii.     如果你有一些自定義的序列化二進位制資料(比如從Cassandra/HBase中讀取資料),那麼你需要首先在Scala/Java端將這些資料轉化成可以被Pyrolite的序列化器處理的資料型別。轉換器就是為此準備的。簡單地拓展這個轉換器的特點同時在convert方法中實現你的轉換程式碼即可。注意,要確保這個類以及訪問你的輸入格式所需的依賴都被加入到Spark job jar中,並且確保這個包已經包含到了PySpark的classpath中。

       iv.     這裡有一些通過自定義轉換器來使用Cassandra/HBase輸入輸出格式的Python樣例和轉換器樣例。

i)  RDD操作

         i.     RDD支援兩類操作:transformations,用於從已有的資料集建立新的資料集;actions,用於在資料集上的計算結束後向驅動程式返回一個值。舉個例子,map是一個transformation,可以將資料集中每一個元素傳給一個函式,同時將計算結果以一個新的RDD返回。另一方面,reduce操作是一個action,能夠使用某些函式來聚集RDD中所有的元素,並且向驅動程式返回最終結果(同時也有一個並行的reduceByKey操作可以返回一個分佈資料集)。

       ii.     在Spark中,所有的transformations操作都是惰性求值的,就是說它們並不會立刻開始計算。相反,它們僅僅是記錄下了需要執行transformations操作的操作物件(比如:一個檔案)。只有當一個action操作被執行,需要向驅動程式返回一個結果時,transformations操作才會真的開始計算。這樣的設計使得Spark執行更加高效。 比如,我們會發覺由map操作產生的資料集將會在reduce操作中用到,之後僅僅是返回了reduce的最終的結果而不是map產生的龐大資料集。

      iii.     在預設情況下,每一個由transformations操作得到的RDD都會在每次執行actions操作時被重新計算。但是,你也可以通過呼叫persist(或cache)方法來將RDD駐留在記憶體中,這樣Spark就可以在下次使用這個資料集時快速獲得它。Spark同樣提供了對將RDD記錄到硬碟上或在多個節點間複製的支援。

j)  基本操作

         i.     以下程式是對RDD基本用法的演示說明:

lines =sc.textFile("data.txt")
lineLengths= lines.map(lambda s: len(s))
totalLength= lineLengths.reduce(lambda a, b: a + b)

第一行定義了一個由外部檔案產生的基本RDD。這個資料集不是從記憶體中載入的也不是由其他操作產生的:lines僅僅是一個指向檔案的指標。第二行將lineLengths定義為map這一transformation操作的結果。再強調一次,由於惰性求值的緣故,lineLengths並不會被立即計算出來。最後,我們運行了reduce操作,這是一個action操作。此時,Spark將計算過程劃分成許多tasks並在叢集上執行,每臺機器執行自己的task的map操作和本地的reduce操作,只是將自己的task的運算結果返回給驅動程式。

       ii.     如果我們希望以後重複使用lineLengths,只需在reduce之前追加下面這行程式碼:

lineLengths.persist()

這條程式碼將使得lineLengths在第一次計算生成之後駐留在記憶體中。

k)  向Spark傳遞函式:

         i.     Spark的API嚴重依賴於在驅動程式中傳遞函式作為引數。有三種推薦的方法來傳遞函式作為引數:

1.  Lambda表示式,簡單的函式可以直接寫成一個lambda表示式(lambda表示式不支援多語句函式和無返回值的語句)。

2.  對於程式碼很長的函式,Spark呼叫在本地用def定義的函式。

3.  模組中的頂級函式。

       ii.     比如,傳遞一個無法轉化為lambda表示式的長函式,可以像以下程式碼這樣:

"""MyScript.py"""
if__name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)
 
    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

      iii.     應當注意的是,也可以傳遞類例項中方法的引用(與單例物件相反),這種傳遞方法會將整個物件傳遞過去。比如,考慮以下程式碼:

classMyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        returnrdd.map(self.func)

       iv.     在這裡,如果我們建立了一個新的MyClass物件,然後對它呼叫doStuff方法,map會用到這個物件中func方法的引用,所以整個物件都需要傳遞到叢集中。

         v.     還有另一種相似的寫法,訪問外部物件的資料域需要傳遞整個物件的引用:

classMyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        returnrdd.map(lambda s: self.field + x)

       vi.     此類問題最簡單的避免方法就是,使用一個區域性變數快取一份這個資料域的拷貝,避免外部直接訪問物件屬性:

def doStuff(self,rdd):
    field = self.field
    returnrdd.map(lambda s: field + x)