Spark 2.0 Programming Guide 翻譯(PySpark)
阿新 • • 發佈:2019-01-09
1、spark2.0 工作依靠python2.6+或python3.4+ ,他可以使用標準的cpython直譯器,所以說C libraries 例如numpy可以使用,它工作依靠pypy2.3+
在python下執行spark應用,利用 設定你喜歡的python版本。
2、初始化Spark
Spark程式第一個必須要做的是:建立一個SparkContexr物件,他會告訴Spark如何訪問叢集。在建立一個SparkContext物件之前你需要建立一個SparkConf物件,sparkConf物件包括了你的應用的資訊。
在實際中,當執行程式在叢集上時,不會設定master
,但當在本地測試或者單元測試時,你可以運用local
3、利用shell
4、彈性分散式資料集
RDDs
4.1並行集合(parallelize
colleactions)
在程式中,並行的集合有SparkCinext的parallelize方法來實現,通過一個現存的集合或者可迭代資料,集合中的元素被複制去構建一個可以並行執行的分散式的資料集。eg:
(3)、textfile方法有一個可選的第二引數設定檔案的分割槽數量。 除了textflie方法,pyspark還支援其他幾種data formats: SparkContext.wholeTextFiles:可以讀取一個目錄下的多個檔案
bin/spark-submit
指令碼(在spark
目錄下),可以load spark的java/scala的libraaies並且允許你提交應用到叢集。可以利用bin/pyspark
去執行一個互動式的python
shell。
若果你想獲得HDFS的資料,你需要建立spark與你的hdfs對應版本的聯絡,
最後,你需要import一些spark的包到你的程式中,例如:
frompysparkimportSparkContext,SparkConf
PySpark
需要driver 和 workers 有相同的python版本,你可以利用PYSPARK_PYTHONconf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName ====> 是你顯示在叢集使用者介面的應用程式名稱
master
=====> spark、mesos、yarn、或者local 執行在本地模型data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
一旦被建立,這個分散式的資料集就可以並行執行。eg:我們可以利用distData.reduce(lambda
a, b: a + b) 對list中的元素累加求和。通常情況下,你希望你的叢集上的cpu有2-4個劃分。通常情況下,spark會基於你的叢集自動設定劃分的數量。然而你也可以自己設定(e.g.sc.parallelize(data,
10)
).,通過parallelize的第二個引數。
4.2外部資料集
PySpark可以建立通過hadoop建立分散式資料集,不分其來源是什麼,包括你的本地檔案系統,HDFS,Cassandra, HBase,Amazon
S3 等,Spark 支援 text files,SequenceFiles,和
any other HadoopInputFormat.
text file 的RDDs建立通過SparkContext的textFile的方法,這個方法利用檔案的URI,以行的方式讀取為一個集合,eg:
>>> distFile = sc.textFile("data.txt")
利用spark讀取檔案的一些筆記:
(1)、如果利用本地檔案系統的一個路徑,這個檔案必須在worker節點上可以以同樣的路徑訪問。不論是拷貝檔案到多有的files或者是利用共享檔案系統。
(2)、所有的Spark檔案輸入方法,包括textfile,支援目錄,壓縮檔案或者萬用字元。rg:
textFile("/my/directory")
,textFile("/my/directory/*.txt")
,
andtextFile("/my/directory/*.gz")
.(3)、textfile方法有一個可選的第二引數設定檔案的分割槽數量。 除了textflie方法,pyspark還支援其他幾種data formats: SparkContext.wholeTextFiles:可以讀取一個目錄下的多個檔案
RDD.saveAsPickleFile
and SparkContext.pickleFile:支援儲存一個RDD
序列化為一個python物件
SequenceFile and Hadoop Input/Output Formats
注意:這些方式目前還是實驗性的,將來或許會被SparkSQL所代替,因為SparkSQL是首選方法。
4.3 儲存和載入SequenceFiles
SequenceFiles 可以通過指定的路勁save和載入, 可以指定鍵值,但是對於標準的 Writables 是不需要的。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
4.4儲存和載入其他Hadoop輸入/輸出格式:
針對新的或老的hadoop MR API,pySpark 可以讀取任何hadoop輸入格式或者寫任何hadoop輸出格式,
5、RDD操作
5.1 RDDs支援兩種不同型別的操作
transformations:通過已經存在的資料集建立一個新的資料集
actions:在資料集上執行完一個計算後,返回驅動程式一個值。
eg:
map:一個transformation
對資料集的每個元素執行一個函式,返回一個新的資料集
reduce:一個action,彙集RDD的所有元素利用一個函式,返回一個結果給驅動程式
reduceByKey
: 返回一個分散式的資料集
在Spark中所有的tansformation都是lazy的,並不是立刻計算它們的結果,而是記住這些轉換。當一個action需要返回驅動程式一個結果時候,這些transfomaton才會執行。
預設情況下,對於每一個transformation,在其上執行一個action,可以使其執行。然而可以利用persist(或cache)方法,將元素防到叢集上,當下一次請求時,可以快速的獲得他。
5.2 Spark 中的傳遞函式
Spark的驅動程式在機上上執行時非常依賴與傳遞函式,有三種推薦方式去執行它:
(1)、lambda表示式
(2)、區域性def 函式
(3)、頂級功能模組
5.3理解閉包
Spark的一個難點是:在叢集上執行程式碼時,理解變數和函式的範圍和作用域。在變數的作用範圍之外修改變數時混亂的源泉。
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
Local
VS Cluster
上述程式碼的表現是不確定的,或許可以按照你預想的方式執行。為了執行jobs,Spark將RDD的操作處理分解成任務,每一個都是一個執行者。在執行之前,Spark會計算任務task的閉包。閉包即在執行者執行RDD操作時,其執行所需的變數和方法對其是可見的。這些閉包會被序列化然後送到沒一個執行者上邊。
在傳送到執行者的閉包中的變數,現在是原始的副本,當counter 被引用時,他不再是driver上的counter。在driver的記憶體中還有一個counter ,但是其對executer(執行者)是不可見的。所以,counter最後的值還是0,因為所有對counter執行的操作都是在序列化的閉包中執行的。
在local模式中,在某些場景中,foreach函式的執行會在相同的JVM中,或許會引用到相同的原始的counter,或許會真的uodate它。
確保定義的行為在這些場景中,應該利用accumulator。在Spark中,Accumulator提供一種安全的更新變數的方式,擋在叢集中執行時。
通常情況下,在利用loops或local結構定義方法時,閉包不應該改變全域性變數的值。Spark,沒有定義或者保證在閉包之外修改變數的行為。有一些程式碼,活血在local模式下可以執行,但是那只是意外,這些低嗎在分散式的情況下不會跟期待的那樣執行。
5.5 輸出RDD的元素
輸出RDD中的元素:rdd.foreach(println)
orrdd.map(println)
.,在單機模式下,這有可能會會按照所期待的那樣輸出。但是在叢集下,並不行。
輸出driver中的所有元素利用collect(),可能會導致記憶體溢位,因為collect()會將所有的Rdd放到單機上。如果只是想輸出RDD的少數部分元素,一種安全的方式是利用take()。