1. 程式人生 > >pyspark dataframe列的合併與拆分

pyspark dataframe列的合併與拆分

使用Spark SQL在對資料進行處理的過程中,可能會遇到對一列資料拆分為多列,或者把多列資料合併為一列。這裡記錄一下目前想到的對DataFrame列資料進行合併和拆分的幾種方法。

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("dataframe_split") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext
df = spark.read.csv('hdfs://master:9000/dataset/dataframe_split.csv', inferSchema=True, header=True)
df.show(3)

原始資料如下所示

  • dataframe列資料的分割

from pyspark.sql.functions import split, explode, concat, concat_ws
df_split = df.withColumn("s", split(df['score'], " "))
df_split.show()

 

  • dataframe列資料的拆分

zipWithIndex:給每個元素生成一個索引 

排序首先基於分割槽索引,然後是每個分割槽內的專案順序.因此,第一個分割槽中的第一個item索引為0,最後一個分割槽中的最後一個item的索引最大.當RDD包含多個分割槽時此方法需要觸發spark作業.

first_row = df.first()
numAttrs = len(first_row['score'].split(" "))
print("新增列的個數", numAttrs)
attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
print("列名:", attrs)
for name, index in attrs:
    df_split = df_split.withColumn(name, df_split['s'].getItem(index))
df_split.show()

 

  • dataframe將一行分成多行

df_explode = df.withColumn("e", explode(split(df['score'], " ")))
df_explode.show()

 

  • dataframe列資料的合併

列的合併有兩個函式:一個不新增分隔符concat(),一個新增分隔符concat_ws()

concat

df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_concat.show()

 

caoncat_ws

df_ws = df_split.withColumn("score_concat", concat_ws('-', df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_ws.show()

  • dataframe多行轉多列

pivot: 旋轉當前[[dataframe]]列並執行指定的聚合 

#DataFrame 資料格式:每個使用者對每部電影的評分 userID 使用者ID,movieID 電影ID,rating評分
df=spark.sparkContext.parallelize([[15,399,2], \
                                   [15,1401,5], \
                                   [15,1608,4], \
                                   [15,20,4], \
                                   [18,100,3], \
                                   [18,1401,3], \
                                   [18,399,1]])\
                    .toDF(["userID","movieID","rating"])
#pivot 多行轉多列
resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
#結果
resultDF.show()

參考文獻:

Spark DataFrame 列的合併與拆分

Spark DataFrame 多行轉多列