1. 程式人生 > >Spark DataFrame 新增索引列的三種方法

Spark DataFrame 新增索引列的三種方法

Spark DataFrame 新增索引列的三種方法

剛開始用Spark,操作dataframe不是很熟練,遇到的第一個問題是給dataframe新增索引列,查閱了網上的一些教程,大都是用Scala語言編寫的程式碼,下面給出自己用python寫的三種方法。

  • 方法一:先建立Pandas版本的dataframe,然後帶索引儲存為本地檔案,再用SparkSession來建立DataFrame。
  • 方法二:先建立Pandas版本的dataframe,新增一個索引列後,再直接轉換成Spark版本的dataframe。
  • 方法三:直接利用Spark的select方法,新增列的過程包含在自定義的udf函式中。

方法一

1.先建立一個Pandas版本的dataframe,從本地csv檔案匯入資料。

import pandas as pd
from pyspark.sql import *
# build a dataframe
pandas_df = pd.read_csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data.csv')
print(pandas_df)

顯示結果如下
在這裡插入圖片描述
2.然後新增索引列,從1開始編號,並儲存為本地檔案。

pandas_df.index = range(1, len(pandas_df)
+ 1) # 索引值設定為從1開始 print(pandas_df) pandas_df.to_csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data01.csv', index=True, index_label='index')

顯示結果
在這裡插入圖片描述
3.再用SparkSession來建立DataFrame。

spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data01.csv'
, header=True) df.show()

結果如下
在這裡插入圖片描述


方法二

1.第一步同方法一。
2.先新增一個索引列,insert方法可以設定列的插入位置。

pandas_df.index = range(1, len(pandas_df) + 1)  # set index from 1
pandas_index = pandas_df.index    # 將dataframe的索引賦給一個變數
pandas_df.insert(0, 'index', pandas_index)   # 第一個引數是列插入的位置
print(pandas_df)

結果如下
在這裡插入圖片描述
3. 再直接轉換成Spark版本的dataframe

spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

結果如下
在這裡插入圖片描述


方法三

1.用SparkSession來建立dataframe,從本地直接讀取資料。

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()
spark_df = spark.read.csv('/home/hadoop/PycharmProjects/wjw/ftdd/ftdd/data/net_data.csv', header=True)
spark_df.show()

結果如下
在這裡插入圖片描述
2. 定義自己的udf函式,來建立索引列。

index_list = [x for x in range(1, spark_df.count()+1)]  # 構造一個列表儲存索引值,用生成器會出錯
idx = 0
# 定義一個函式
def set_index(x):  
    global idx    # 將idx設定為全域性變數
    if x is not None:
        idx += 1
        return index_list[idx-1]

3.呼叫select方法來新增索引列。

index = udf(set_index, IntegerType())    # udf的註冊,這裡需要定義其返回值型別
spark_df.select(col("*"), index("cab_id").alias("index")).show()   # udf的註冊的使用,alias方法用於修改列名

結果如下
在這裡插入圖片描述