1. 程式人生 > >Spark 2.0 Programming Guide 翻譯(PySpark)

Spark 2.0 Programming Guide 翻譯(PySpark)

1、spark2.0 工作依靠python2.6+或python3.4+ ,他可以使用標準的cpython直譯器,所以說C libraries 例如numpy可以使用,它工作依靠pypy2.3+        在python下執行spark應用,利用bin/spark-submit 指令碼(在spark 目錄下),可以load  spark的java/scala的libraaies並且允許你提交應用到叢集。可以利用bin/pyspark 去執行一個互動式的python shell。         若果你想獲得HDFS的資料,你需要建立spark與你的hdfs對應版本的聯絡,     最後,你需要import一些spark的包到你的程式中,例如: frompysparkimportSparkContext,SparkConf PySpark 需要driver 和 workers 有相同的python版本,你可以利用PYSPARK_PYTHON
 設定你喜歡的python版本。
2、初始化Spark Spark程式第一個必須要做的是:建立一個SparkContexr物件,他會告訴Spark如何訪問叢集。在建立一個SparkContext物件之前你需要建立一個SparkConf物件,sparkConf物件包括了你的應用的資訊。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName ====> 是你顯示在叢集使用者介面的應用程式名稱 master =====> spark、mesos、yarn、或者local  執行在本地模型
在實際中,當執行程式在叢集上時,不會設定master ,但當在本地測試或者單元測試時,你可以運用local 3、利用shell 4、彈性分散式資料集 RDDs    4.1並行集合(parallelize colleactions)     在程式中,並行的集合有SparkCinext的parallelize方法來實現,通過一個現存的集合或者可迭代資料,集合中的元素被複制去構建一個可以並行執行的分散式的資料集。eg:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
一旦被建立,這個分散式的資料集就可以並行執行。eg:我們可以利用distData.reduce(lambda a, b: a + b) 對list中的元素累加求和。通常情況下,你希望你的叢集上的cpu有2-4個劃分。通常情況下,spark會基於你的叢集自動設定劃分的數量。然而你也可以自己設定(e.g.
sc.parallelize(data, 10)).,通過parallelize的第二個引數。
    4.2外部資料集     PySpark可以建立通過hadoop建立分散式資料集,不分其來源是什麼,包括你的本地檔案系統,HDFS,Cassandra, HBase,Amazon S3 等,Spark 支援 text files,SequenceFiles,和 any other HadoopInputFormat.        text file 的RDDs建立通過SparkContext的textFile的方法,這個方法利用檔案的URI,以行的方式讀取為一個集合,eg:
>>> distFile = sc.textFile("data.txt")
利用spark讀取檔案的一些筆記:             (1)、如果利用本地檔案系統的一個路徑,這個檔案必須在worker節點上可以以同樣的路徑訪問。不論是拷貝檔案到多有的files或者是利用共享檔案系統。             (2)、所有的Spark檔案輸入方法,包括textfile,支援目錄,壓縮檔案或者萬用字元。rg: textFile("/my/directory"),textFile("/my/directory/*.txt"), andtextFile("/my/directory/*.gz").
            (3)、textfile方法有一個可選的第二引數設定檔案的分割槽數量。 除了textflie方法,pyspark還支援其他幾種data formats:         SparkContext.wholeTextFiles:可以讀取一個目錄下的多個檔案 RDD.saveAsPickleFile and SparkContext.pickleFile:支援儲存一個RDD 序列化為一個python物件  SequenceFile and Hadoop Input/Output Formats 注意:這些方式目前還是實驗性的,將來或許會被SparkSQL所代替,因為SparkSQL是首選方法。         4.3 儲存和載入SequenceFiles            SequenceFiles 可以通過指定的路勁save和載入,       可以指定鍵值,但是對於標準的 Writables 是不需要的。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
        4.4儲存和載入其他Hadoop輸入/輸出格式:         針對新的或老的hadoop MR API,pySpark 可以讀取任何hadoop輸入格式或者寫任何hadoop輸出格式, 5、RDD操作     5.1 RDDs支援兩種不同型別的操作     transformations:通過已經存在的資料集建立一個新的資料集     actions:在資料集上執行完一個計算後,返回驅動程式一個值。 eg: map:一個transformation  對資料集的每個元素執行一個函式,返回一個新的資料集 reduce:一個action,彙集RDD的所有元素利用一個函式,返回一個結果給驅動程式 reduceByKey : 返回一個分散式的資料集     在Spark中所有的tansformation都是lazy的,並不是立刻計算它們的結果,而是記住這些轉換。當一個action需要返回驅動程式一個結果時候,這些transfomaton才會執行。 預設情況下,對於每一個transformation,在其上執行一個action,可以使其執行。然而可以利用persist(或cache)方法,將元素防到叢集上,當下一次請求時,可以快速的獲得他。     5.2 Spark 中的傳遞函式     Spark的驅動程式在機上上執行時非常依賴與傳遞函式,有三種推薦方式去執行它:     (1)、lambda表示式     (2)、區域性def 函式     (3)、頂級功能模組     5.3理解閉包     Spark的一個難點是:在叢集上執行程式碼時,理解變數和函式的範圍和作用域。在變數的作用範圍之外修改變數時混亂的源泉。
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
Local    VS     Cluster     上述程式碼的表現是不確定的,或許可以按照你預想的方式執行。為了執行jobs,Spark將RDD的操作處理分解成任務,每一個都是一個執行者。在執行之前,Spark會計算任務task的閉包。閉包即在執行者執行RDD操作時,其執行所需的變數和方法對其是可見的。這些閉包會被序列化然後送到沒一個執行者上邊。     在傳送到執行者的閉包中的變數,現在是原始的副本,當counter 被引用時,他不再是driver上的counter。在driver的記憶體中還有一個counter ,但是其對executer(執行者)是不可見的。所以,counter最後的值還是0,因為所有對counter執行的操作都是在序列化的閉包中執行的。     在local模式中,在某些場景中,foreach函式的執行會在相同的JVM中,或許會引用到相同的原始的counter,或許會真的uodate它。     確保定義的行為在這些場景中,應該利用accumulator。在Spark中,Accumulator提供一種安全的更新變數的方式,擋在叢集中執行時。     通常情況下,在利用loops或local結構定義方法時,閉包不應該改變全域性變數的值。Spark,沒有定義或者保證在閉包之外修改變數的行為。有一些程式碼,活血在local模式下可以執行,但是那只是意外,這些低嗎在分散式的情況下不會跟期待的那樣執行。     5.5 輸出RDD的元素     輸出RDD中的元素:rdd.foreach(println)orrdd.map(println).,在單機模式下,這有可能會會按照所期待的那樣輸出。但是在叢集下,並不行。     輸出driver中的所有元素利用collect(),可能會導致記憶體溢位,因為collect()會將所有的Rdd放到單機上。如果只是想輸出RDD的少數部分元素,一種安全的方式是利用take()。