1. 程式人生 > >Pyspark 官方介面文件翻譯

Pyspark 官方介面文件翻譯

由於網上關於Pyspark的資料太過於零散,官方文件也沒有中文版,所以只能自己嘗試來翻譯,第一次翻譯文件,肯定會有很多謬誤,希望大家能多評論指正,共同學習spark!

核心內容:

    SparkContext:

             Spark功能主要介面

    RDD

             彈性分散式資料集,是Spark的基礎概念

   streaming.StramingContext:

             Spark Streaming功能的主要接入口

    streaming.DStream:

             離散流,是Spark Streaming中的基本概念

   sql.SQLContext:

             DataFrame和sql功能的主要介面

    sql.DataFrame:

             已分組資料指定列的分散式資料集合

SparkConf

一個Spark應用的配置,用於設定key-value形式的Spark變數

大多數時候,你建立SparkConf時候用 SparkConf()的形式從spark.*Java系統屬性中載入,在這種情況下

任何你直接設定的引數都會優先於系統屬性。

對於單元測試,你可以直接呼叫SparkConf來跳過設定來得到相同的配置而不需要關心繫統屬性。

所有的設定方法都支援鏈式連線,如下:

conf.setMaster(“local”).setAppName(“My app”).

注意:一旦一個SparkConf已經傳遞給Spark,它就被複制且不能再被使用者更改

contains(key)

檢視配置是否有key

get(key, defaultValue=None)

拿到一個已經配置好的key的值,否則返回一個預設值

getAll()

拿到所有配置好的屬性值

set(key, value)

設定屬性

setAll(pairs)

批量設定引數,外部以列表形式,內部是key-value形式

setAppName(value)

設定應用名字

setExecutorEnv(key=None,value=None,pairs=None)

設定環境變數傳送給執行緒池

setIfMissing(key,value)

若不存在則設定配置屬性

setMaster(value)

設定主節點URL去連線

setSparkHome(value)

設定Spark在工作節點上的安裝路徑

setDebugString()

返回並列印配置資訊

測試部分介面如下:

SparkContext

SparkContext 可以連線一個Spark叢集,能夠建立RDD和傳送變數到叢集

包擴充套件 = ('.zip', '.egg', '.jar')

accumulator(value,accum_param=None)

建立一個給定初始變數的累加器,用一個給定的累加器引數幫助物件(AccumulatorParam helper object)來

定義如何新增資料型別值。預設累加器引數只適用於整形和浮點型,如果需要其他型別的可以自己定義

addFile(path, recursive=False)

新增一個檔案供Spark任務的每個節點下載使用,path可以是本地檔案路徑也可以是HDFS中檔案(或者是

其他Hadoop支援的檔案系統),也可以是http,https,ftp 地址。

為了將檔案新增到到Spark任務中,使用檔名來尋找它的下載位置:

L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}

如果引數recusive(遞迴)設定為True,可以使用資料夾,但目前只支援Hadoop類的檔案系統

操作示例如下:

addPyFile(path)

新增要給.py或者.zip檔案給所有在SparkContext中將要執行的任務執行,Path可以是本地路徑,HDFS或者是其他Hadoop支援的檔案系統,http,https,ftp連結。

applicationId

一個Spark應用的唯一認證,格式依賴於排程器實現。

>>> sc.applicationId  
'local-...'

binaryFiles(path,minPartitions=None)

讀取二進位制檔案,可以是所有節點的本地檔案系統,HDFS...每個檔案作為單獨記錄,並返回鍵值對,keys是檔案路徑,

value是每個檔案的內容

注意:推薦是小檔案,大檔案也可以,但是可能會效率較差

binaryRecords(path, recordLength)

從二進位制檔案中讀取資料,假設每個記錄都是一串指定數字格式的數字,每個記錄的數字不變,path是檔案路徑

而recordLength是分割記錄的長度。

broadcast(value)

向叢集群發要給只讀變數,返回一個用於在分散式物件中讀取的物件,這個變數只會給每個叢集發一次

cancelAllJobs()

取消所有已新增的或者正在執行的任務

setJobGroup(groupId,descriptiom,interruptOnCancel=False)

給所有在一個執行緒中任務分配一個groupId,經常會有單個應用的一個執行單元由多個Spark任務組成,應用開發者能用

這個方法來給所有任務分組和新增描述資訊,一旦設定好,Spark UI會將任務與組聯絡起來

>>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
...     sleep(100)
...     raise Exception("Task should have been cancelled")
>>> def start_job(x):
...     global result
...     try:
...         sc.setJobGroup("job_to_cancel", "some description")
...         result = sc.parallelize(range(x)).map(map_func).collect()
...     except Exception as e:
...         result = "Cancelled"
...     lock.release()
>>> def stop_job():
...     sleep(5)
...     sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
>>> supress = threading.Thread(target=start_job, args=(10,)).start()
>>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
>>> print(result)
Cancelled

cancelJobGroup(groupId)

取消組內所有任務執行

defaultMinPartitions

預設HadoopRDDS的最小分塊數

defaultParallelism

parallelism預設的level(如:reduce 任務)

dump_profiles(path)

將概要統計資訊轉儲到目錄路徑中

emptyRDD()

建立一個沒有分割槽和元素的RDD

getConf()

拿到該SparkContext的SparkConf物件

hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, 

valueConverter=None, conf=None, batchSize=0)

以HDFS(本地檔案系統)中任意一對key/value讀取一箇舊的Hadoop輸入格式,原理和sc.sequenceFile

相同。

一個Hadoop配置能被以Python字典的形式傳輸,這將轉化為Java中的配置

Path:Hadoop檔案目錄

inputFormatClass:Hadoop輸入格式的全描述類名(eg:“org.apache.hadoop.mapred.TextInputFormat”)

keyClass:key可寫類的全描述類名(e.g. “org.apache.hadoop.io.Text”)

valueClass:value可寫類的全描述類名(e.g. “org.apache.hadoop.io.LongWritable”)

hadoopRDD(inputFormatClasskeyClassvalueClasskeyConverter=NonevalueConverter=None

conf=NonebatchSize=0)

同上類似

parallelize(c, numSlices=None)

分配本地python集合來生成一個RDD,如果輸入型式是一個範圍可以用xrange

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]

pickleFile(name,minPartits=None)

載入之前用RDD.saveAsPickleFile方法儲存的RDD

tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

range(start,end=None,step=1,numSlices=None)

看程式碼自然懂

>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]

runJob(rdd,partitionFunc,partitions=None,allowLocal=False)

在指定的分割槽裡執行partionFunc函式,以元素陣列的形式返回結果,如果沒有指定,就會一直輪詢執行。

如上程式碼,test是分了3個分割槽的RDD,分別是0,2,4,函式中引數制定了只執行第二分割槽,得到結果是4

文件程式碼如下;

myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]

sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, 

minSplits=None, batchSize=0)

...

setLocalProperty(key,value)

設定一個影響從該執行緒提交的作業的本地屬性,例如Spark fair排程器池

setLogLevel(logLevel)

控制日誌級別,級別包括:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

sparkUser()

獲取當前在執行SparkContext的系統使用者名稱

startTime

返回SparkContext的啟動時間

stop()

中斷停止SparkContext

textFile(name, minPartitions=None, use_unicode=True)

從HDFS,本地檔案系統(所有節點可用),返回一個字串RDD,如果use_unicode是False,字串將是 str型別(utf-8編碼)。

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello world!']

union(rdds)

建立RDD列表的合併集,程式碼如下:

>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello']
>>> parallelized = sc.parallelize(["World!"])
>>> sorted(sc.union([textFile, parallelized]).collect())
['Hello', 'World!']

wholeTextFiles(dirPath)

每個檔案都被讀為單獨一份記錄,返回一個鍵值對,key是檔案目錄,value是檔案內容

>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
...    _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
...    _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[('.../1.txt', '1'), ('.../2.txt', '2')]

pyspark.RDD

aggreate(zeroValue, seqOp, combOp)

aggregate先對每個分割槽的元素做聚集,然後對所有分割槽的結果做聚集,聚集過程中,使用的是給定的聚集函式以及初始值”zero value”。這個函式能返回一個與原始RDD不同的型別U,因此,需要一個合併RDD型別T到結果型別U的函式,還需要一個合併型別U的函式。這兩個函式都可以修改和返回他們的第一個引數,而不是重新新建一個U型別的引數以避免重新分配記憶體。 
引數zeroValue:seqOp運算子的每個分割槽的累積結果的初始值以及combOp運算子的不同分割槽的組合結果的初始值 - 這通常將是初始元素(例如“Nil”表的列表 連線或“0”表示求和) 
引數seqOp: 每個分割槽累積結果的聚集函式。 
引數combOp: 一個關聯運算子用於組合不同分割槽的結果

>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0)
x是(0,0),y是[1,2,3,4]

執行過程:

首先定義要給初始值(0,0)

x[0]+1, x[1]+1

1+2,1+1

3+3,2+1

6+4,3+1

不斷地將第一個計算結果當作第二個的zeroValue(類似reduce的操作形式)

如果分割槽了就各自計算,然後再相加

checkpoint()

標記這個RDD作為檢查標記點, 儲存在

這個函式必須先在這個RDD執行任何任務之前先被喚起,強烈建議存在記憶體中不然存在檔案中會要求重新計算。

coalesce(numPartitions, shuffle=False)

>>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
[[1], [2, 3], [4, 5]]
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]

collect()

返回一個包含RDD所有元素的列表

collectAdMap()

返回一個key-value對。

>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
2
>>> m[3]
4

上兩個方法都是隻能建立在目標資料量不大的情況下,因為資料都會被寫入記憶體。