1. 程式人生 > >PySpark中RDD與DataFrame

PySpark中RDD與DataFrame

1. 彈性資料集RDD

RDD是一個抽象的分散式資料集合,它提供了一系列轉化操作(例如基本的map()flatMap()filter(),類集合操作union()intersection()subtract())和行動操作(例如collect()count()take()top()reduce()foreach())。可以說,RDD是非常靈活的資料集合,其中可以存放型別相同或者互異的資料,同時可以指定任何自己期望的函式對其中的資料進行處理。
建立一個RDD:

# 從list中建立
rdd = sc.parallelize([1, '2', (3, 4), ['5', '6'
]]) # 從檔案中讀取 rdd = sc.textFile('\path\to\file')

還有一類RDD是key-value Pair RDD,即規定RDD每個元素都是一個二元組,其中第一個值是key,第二個值為value,key一般選取RDD中每個元素的一個欄位。
建立一個Pair RDD:

# 建立一個普通RDD
rdd = sc.parallelize([('a', 1, 2), ('b', 3, 4), ('c', 5, 6)])
# 提取每個元素的第一個元素作為key剩餘元素作為value建立Pair RDD
pair_rdd = rdd.map(lambda x: (x[0
], x[1:]))

可以看到Pair RDD實質上仍然是一個普通的RDD,那為什麼它要單獨拿出來講呢?
這是因為,Pair RDD由於有key的存在,與普通的RDD相比更加格式化,這種特性就會給Pair RDD賦予一些特殊的操作,例如groupByKey()可以將具有相同key進行分組,其結果仍然得到Pair RDD,然後利用mapValues()對相同key的value進行函式計算;reduceByKey()countByKey()sortByKey()等一系列“ByKey()”操作同理。
另外,兩個Pair RDD具有像SQL一樣的連線操作,例如兩個Pair RDD進行join()

後,具有相同key的元素的value會被放在一個元組裡,key不相同的元素會被捨棄。leftOuterJoin()rightOuterJoin()fullOuterJoin()等操作同理。

2. Spark SQL中的DataFrame

Pair RDD已經被一定程度的格式化了,它的每個元素會具有key,但是value仍然具有很大的靈活性。DataFrame是一種完全格式化的資料集合,和資料庫中的的概念比較接近,它每列資料必須具有相同的資料型別。也正是由於DataFrame知道資料集合所有的型別資訊,DataFrame可以進行列處理優化而獲得比RDD更優的效能。
在內部實現上,DataFrame是由Row物件為元素組成的集合,每個Row物件儲存DataFrame的一行,Row物件中記錄每個域=>值的對映,因而Row可以被看做是一個結構體型別。可以通過建立多個tuple/listdictRow然後構建DataFrame。
注:用dict構建DataFrame已經廢棄了,推薦用Row

# 建立list的list
lists = [['a', 1], ['b', 2]]
# 構建具有預設生成的列_1、_2的DataFrame
dataframe = spark.createDataFrame(lists)

# 建立dict的list
dicts = [{'col1':'a', 'col2':1}, {'col1':'b', 'col2':2}]
# 構建具有列col1、col2的DataFrame
dataframe = spark.createDataFrame(dicts)

# 建立Row的list
rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]
# 構建具有列col1、col2的DataFrame
dataframe = spark.createDataFrame(rows)

雖然DataFrame被完全格式化了,但是其中每列可以儲存的型別仍然是非常豐富的,包括基本的資料型別、list、tuple、dict和Row,這也就意味著所有的複雜資料型別都可以相互巢狀,從而解除了完全格式化的限制。例如,你可以在一列中儲存list型別,而每行list按需儲存不定長的資料。
那麼,RDD和DataFrame還有哪些使用上的區別呢?

  • RDD:沒有列名稱,只能使用數字來索引;具有map()reduce()等方法並可指定任意函式進行計算;
  • DataFrame:一定有列名稱(即使是預設生成的),可以通過.col_name或者['col_name']來索引列;具有表的相關操作(例如select()filter()where()join),但是沒有map()reduce()等方法。

3. RDD轉換為DataFrame

什麼樣的RDD可以轉換為DataFrame?
RDD靈活性很大,並不是所有RDD都能轉換為DataFrame,而那些每個元素具有一定相似格式的時候才可以。

為什麼RDD需要轉換為DataFrame?
當RDD進行類似的相應操作時,都需要指定相應的函式,轉換為DataFrame書寫更簡單,並且執行效率高。

怎麼樣將RDD轉換為DataFrame?
就像之前的例子一樣,可以利用

dataframe = spark.createDataFrame(rdd, schema=None, samplingRatio=None)

來將RDD轉換為DataFrame,其中的引數設定需要注意:
schema:DataFrame各列型別資訊,在提前知道RDD所有型別資訊時設定。例如

schema = StructType([StructField('col1', StringType()),
         StructField('col2', IntegerType())])

samplingRatio:推測各列型別資訊的取樣比例,在未知RDD所有型別資訊時,spark需要根據一定的資料量進行型別推測;預設情況下,spark會抽取前100的RDD進行推測,之後在真正將RDD轉換為DataFrame時如果遇到型別資訊不符會報錯 Some of types cannot be determined by the first 100 rows, please try again with sampling 。同理取樣比例較低,推測型別資訊也可能錯誤。

4. DataFrame轉換為RDD

有時候DataFrame的相關操作不能處理一些問題,例如需要對一些資料利用指定的函式進行計算時,就需要將DataFrame轉換為RDD。DataFrame可以直接利用.rdd獲取對應的RDD物件,此RDD物件的每個元素使用Row物件來表示,每列值會成為Row物件的一個域=>值對映。例如

dataframe = spark.createDataFrame([Row(col1='a', col2=1), Row(col1='b', col2=2)])
>>> 
+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   b|   2|
+----+----+

rdd = dataframe.rdd
>>> [Row(col1=u'a', col2=1), Row(col1=u'b', col2=2)]

DataFrame轉化後的RDD如果需要和一般形式的RDD進行操作(例如join),還需要做索引將數值從Row中取出,比如轉化為Pair RDD可以這樣操作

rdd = rdd.map(lambda x: [x[0], x[1:]])
>>> [[u'a', (1,)], [u'b', (2,)]]

注意:DataFrame轉化的RDD可能包含Row(col1='a'),它和'a'是不同的物件,所以如果與一般的RDD進行join,還需要索引Row取出數值。