1. 程式人生 > >Spark RDD基礎操作

Spark RDD基礎操作

標題 舉例
解釋 Spark的基本資訊
Spark 1個driver(膝上型電腦或者叢集閘道器機器上)和若干個executor(在各個節點上)組成。通過SparkContext(簡稱sc)連線Spark叢集、建立RDD、累加器(accumlator)、廣播變數(broadcast variables),簡單可以認為SparkContext是Spark程式的根本。
Driver 把計算任務分成一系列小的task,然後送到executor執行。executor之間可以通訊,在每個executor完成自己的task以後,所有的資訊會被傳回。
RDD Resilient Distributed Dataset(彈性分散式資料集,既可以儲存在本地,也可以儲存在叢集上,簡稱RDD).是一個包含諸多元素、被劃分到不同節點上進行並行處理的資料集合。在節點發生錯誤時RDD也可以自動恢復。RDD就像一個NumPy array或者一個Pandas Series,可以視作一個有序的item集合。只不過這些item並不存在driver端的記憶體裡,而是被分割成很多個partitions,每個partition的資料存在叢集的executor的記憶體中。
   
初始化RDD 呼叫庫--設定路徑--初始化檔案為RDD檔案
呼叫Spark庫 from pyspark import SparkContext
sc = SparkContext('local', 'pyspark')
#sc = SparkContext('spark://ha-nn-001:7077', 'pyspark')
a.從本地記憶體中構造 1:makeRDD方法
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
2:parallelize方法
val rdd01 = sc.parallelize(List(1,2,3,4,5,6))
b.通過檔案系統構造,即將已有檔案初始化為RDD  
b.1檔案路徑處理 #使用os.path.join()拼接路徑
import os
path=os.path.join('user','home','nave.txt')
#path會自動返回user/home/nave.txt,好處是會根據不同的系統選擇連線符為/  或  \

#直接拼路徑
import os
cwd = os.getcwd()   #記錄當前的python路徑
rdd = sc.textFile("file://" + cwd + "/names/yob1880.txt")  #file:// 是告訴spark到本地去找文件
b.2 初始化本地檔案,初始化後每一行會被看成一個item rdd = sc.textFile("file://" + cwd + "/names/yob1880.txt")  #file:// 是告訴spark到本地去找文件
匯入整個文件,整個文件作為一個item
rdd = sc.wholeTextFiles("file://" + cwd + "/names")
b.2 初始化叢集HDFS上的檔案 rdd = sc.textFile(cwd + "/names/yob1880.txt") 
   
檢視RDD資訊 檢視RDD檔案的內容
rdd :檢視RDD的開啟地址 直接輸入rdd檔名
rdd.first():顯示rdd的第一條item rdd檔名.first()
rdd.count():檢視rdd中的記錄數 rdd檔名.count()
   
transformation:轉化操作 僅僅是對RDD下達操作指令,Spark僅僅會記錄要進行的操作,並不執行操作,直到需要執行action指令時才會執行操作。
rdd.map(func):
對rdd中的每一條item執行func,並返回一個新的rdd檔案
quaresRDD = numbersRDD.map(lambda x: x**2) 
rdd.flatMap():
對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list類似append
sentencesRDD = sc.parallelize(['Hello world', 'My name is Patrick'])
wordsRDD = sentencesRDD.flatMap(lambda sentence: sentence.split(" "))
結果:['Hello', 'world', 'My', 'name', 'is', 'Patrick']
若使用map:
wordsRDD = sentencesRDD.map(lambda sentence: sentence.split(" "))
結果:['Hello', 'world', 'My', 'name', 'is', 'Patrick']
rdd.fiiter(func):
過濾功能,將所有符合函式條件的item組成一個新的list輸出
rddM = (rdd.filter(lambda x: x is not None and x.startswith('M')))
rdd.distinct():
 對RDD中的item去重
rdd檔名.distinct()
rdd.sample(withReplacement, fraction, seed):
取樣函式
withReplacement:這個值如果是true時,採用PoissonSampler抽樣器(Poisson分佈),否則使用BernoulliSampler的抽樣器.
Fraction:一個大於0,小於或等於1的小數值,用於控制要讀取的資料所佔整個資料集的概率.
Seed:這個值如果沒有傳入,預設值是一個0~Long.maxvalue之間的整數.
rdd1.union(rdd2): 
所有rdd1和rdd2中的item組合
numbersRDD.union(moreNumbersRDD).collect()
rdd1.intersection(rdd2):
 rdd1 和 rdd2的交集
numbersRDD.intersection(moreNumbersRDD).collect()
rdd1.substract(rdd2): 
所有在rdd1中但不在rdd2中的item(差集)
numbersRDD.subtract(moreNumbersRDD).collect()
rdd1.cartesian(rdd2): 
rdd1 和 rdd2中所有的元素笛卡爾乘積
numbersRDD.cartesian(moreNumbersRDD).collect()
   
action:執行操作 輸出transformation的執行結果
rdd.collect()
計算所有的items並返回所有的結果到driver端,接著 collect()會以Python list的形式返回結果
rdd.collect()
rdd.first()
和上面是類似的,不過只返回第1item
rdd.first()
rdd.take(n):
 
類似,但是返回nitem
rdd.take(n)
rdd.count()
計算RDDitem的個數
rdd.count()
rdd.top(n):
 
返回頭nitems,按照自然結果排序
rdd.top(n)
rdd.reduce()
RDD中的items做聚合
rdd = sc.parallelize(range(1,10+1))
rdd.reduce(lambda x, y: x + y)  
 #實際上是對rdd裡面所有的元素進行求和,reduce 可以設定兩個未知數,並對兩個未知數進行處理
#處理方式,元素1=x,元素2=y,xy計算的結果作為x,元素3作為y,直到對所有的結果執行了操作
   
pair RDDs transformation操作 以元組形式組織的k-v對(key, value),叫做pair RDDs
生成pair Tdd rdd = sc.parallelize(["Hello hello", "Hello New York", "York says hello"])
resultRDD = (
    rdd
    .flatMap(lambda sentence: sentence.split(" "))  # split into words
    .map(lambda word: word.lower())                 # lowercase
    .map(lambda word: (word, 1))                    # count each appearance
    .reduceByKey(lambda x, y: x + y)                # add counts for each word
    .sortByKey()
)
resultRDD.collect()
reduceByKey()
對所有有著相同keyitems的value執行reduce操作
參照上例
groupByKey()
返回類似(key, listOfValues)元組的RDD,後面的value List 是同一個key下面的
resultRDD.groupByKey().collect()
sortByKey()
按照key排序
參照上例
countByKey():
 
按照key去對item個數進行統計
RDD.countByKey()
collectAsMap():
 
collect有些類似,但是返回的是k-v的字典
 
join:
只合並具有相同鍵值的項,沒有相同的不顯示
homesRDD.join(lifeQualityRDD).collect()