1. 程式人生 > >【總結】PySpark的DataFrame處理方法:增刪改差

【總結】PySpark的DataFrame處理方法:增刪改差

基本操作: 執行時獲取spark版本號(以spark 2.0.0為例):
sparksn = SparkSession.builder.appName("PythonSQL").getOrCreate() print sparksn.version
建立和轉換格式: Pandas和Spark的DataFrame兩者互相轉換:
pandas_df = spark_df.toPandas() spark_df = sqlContext.createDataFrame(pandas_df)
與Spark RDD的相互轉換:
rdd_df = df.rdd df = rdd_df.toDF()
注:rdd轉df前提是每個rdd的型別都是Row型別 增: 新增列:
df.withColumn(“xx”, 0).show() 會報錯,因為原來沒有xx列

from pyspark.sql import functions df = df.withColumn(“xx”, functions.lit(0)).show()
fillna函式:
df.na.fill()
以原有列為基礎新增列:
df = df.withColumn('count20', df["count"] - 20)  # 新列為原有列的資料減去20
刪: 刪除一列:
df.drop('age').collect() df.drop(df.age).collect()
dropna函式:
df = df.na.drop()  # 扔掉任何列包含na的行
df = df.dropna(subset=['col_name1', 'col_name2'])  # 扔掉col1或col2中任一一列包含na的行
改: 修改原有df[“xx”]列的所有值:
df = df.withColumn(“xx”, 1)
修改列的型別(型別投射):
df = df.withColumn("year2", df["year1"].cast("Int"))
合併2個表的join方法:
 df_join = df_left.join(df_right, df_left.key == df_right.key, "inner")
其中,方法可以為:`inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. groupBy方法整合:
GroupedData = df.groupBy(“age”)

應用單個函式(按照A列同名的進行分組,組內對B列進行均值計算來合併):
df.groupBy(“A”).avg(“B”).show() 應用多個函式: from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()
整合後GroupedData型別可用的方法(均返回DataFrame型別): avg(*cols)     ——   計算每組中一列或多列的平均值 count()          ——   計算每組中一共有多少行,返回DataFrame有2列,一列為分組的組名,另一列為行總數 max(*cols)    ——   計算每組中一列或多列的最大值 mean(*cols)  ——  計算每組中一列或多列的平均值 min(*cols)     ——  計算每組中一列或多列的最小值 sum(*cols)    ——   計算每組中一列或多列的總和 【函式應用】將df的每一列應用函式f:
df.foreach(f) 或者 df.rdd.foreach(f)
【函式應用】將df的每一塊應用函式f:
df.foreachPartition(f) 或者 df.rdd.foreachPartition(f)
【Map和Reduce應用】返回型別seqRDDs
df.map(func)
df.reduce(func)
解決toDF()跑出First 100 rows型別無法確定的異常,可以採用將Row內每個元素都統一轉格式,或者判斷格式處理的方法,解決包含None型別時轉換成DataFrame出錯的問題:
    @staticmethod     def map_convert_none_to_str(row):         dict_row = row.asDict()         for key in dict_row:             if key != 'some_column_name':                 value = dict_row[key]                 if value is None:                     value_in = str("")                 else:                     value_in = str(value)                 dict_row[key] = value_in         columns = dict_row.keys()         v = dict_row.values()         row = Row(*columns)         return row(*v)
查: 行元素查詢操作: 像SQL那樣列印列表前20元素(show函式內可用int型別指定要列印的行數):
df.show()
df.show(30)
以樹的形式列印概要
df.printSchema()
獲取頭幾行到本地:
list = df.head(3)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
list = df.take(5)   # Example: [Row(a=1, b=1), Row(a=2, b=2), ... ...]
輸出list型別,list中每個元素是Row類:
list = df.collect()
注:此方法將所有資料全部匯入到本地 查詢總行數:
 int_num = df.count()
查詢某列為null的行:
from pyspark.sql.functions import isnull
df = df.filter(isnull("col_a"))
列元素操作: 獲取Row元素的所有列名:
r = Row(age=11, name='Alice')
print r.__fields__    #  ['age', 'name']
選擇一列或多列:
df.select(“name”) df.select(df[‘name’], df[‘age’]+1) df.select(df.a, df.b, df.c)    # 選擇a、b、c三列 df.select(df["a"], df["b"], df["c"])    # 選擇a、b、c三列
排序:
df = df.sort("age", ascending=False)
過濾資料(filter和where方法相同):
df = df.filter(df['age']>21)
df = df.where(df['age']>21)

# 對null或nan資料進行過濾:
from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull("a"))  # 把a列裡面資料為null的篩選出來(代表python的None型別)
df = df.filter(isnan("a"))  # 把a列裡面資料為nan的篩選出來(Not a Number,非數字資料)
SQL操作: DataFrame註冊成SQL的表:
df.createOrReplaceTempView("TBL1")
進行SQL查詢(返回DataFrame):
conf = SparkConf()
ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()


df = ss.sql(“SELECT name, age FROM TBL1 WHERE age >= 13 AND age <= 19″)
時間序列操作: 先按某幾列分組,再按時間段分組:
from pyspark.sql.functions import window

win_monday = window("col1", "1 week", startTime="4 day") GroupedData = df.groupBy([df.col2, df.col3, df.col4, win_monday])
參考資料: 傳統MySQL查詢(執行時間 19 min 16.58 sec):
mysql>  SELECT     MIN(yearD),     MAX(yearD) AS max_year,     Carrier,     COUNT(*) AS cnt,     SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed,     ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate FROM     ontime_part WHERE     DayOfWeek NOT IN (6 , 7)         AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI')         AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI') GROUP BY carrier HAVING cnt > 1000 AND max_year > '1990' ORDER BY rate DESC , cnt DESC LIMIT 10;
使用Scala語言摘寫的Spark查詢(執行時間 2 min 19.628 sec):
scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",                                                    "dbtable" -> "ontime.ontime_sm",                                                         "fetchSize" -> "10000",                                                    "partitionColumn" -> "yeard",                                                    "lowerBound" -> "1988",                                                    "upperBound" -> "2015",                                                    "numPartitions" -> "48")).load() jdbcDF.createOrReplaceTempView("ontime") val sqlDF = sql("SELECT                      MIN(yearD),                      MAX(yearD) AS max_year,                      Carrier,                      COUNT(*) AS cnt,                      SUM(IF(ArrDelayMinutes > 30, 1, 0)) AS flights_delayed,                      ROUND(SUM(IF(ArrDelayMinutes > 30, 1, 0)) / COUNT(*),2) AS rate                  FROM                      ontime_part                  WHERE                      DayOfWeek NOT IN (6 , 7)                          AND OriginState NOT IN ('AK' , 'HI', 'PR', 'VI')                          AND DestState NOT IN ('AK' , 'HI', 'PR', 'VI')                  GROUP BY carrier                  HAVING cnt > 1000 AND max_year > '1990'                  ORDER BY rate DESC , cnt DESC                  LIMIT 10; ") sqlDF.show()
Spark RDD中的map、reduce等操作的概念詳解: map將RDD中的每個元素都經過map內函式處理後返回給原來的RDD,即對每個RDD單獨處理且不影響其它和總量。屬於一對一的關係(這裡一指的是對1個RDD而言)。 flatMap將RDD中的每個元素進行處理,返回一個list,list裡面可以是1個或多個RDD,最終RDD總數會不變或變多。屬於一變多的關係(這裡一指的是對1個RDD而言)。 reduce將RDD中元素前兩個傳給輸入函式,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函式,直到最後只有一個值為止。屬於多變一的關係。 val c = sc.parallelize(1 to 10) c.reduce((x, y) => x + y)//結果55 reduceByKey(binary_function)  reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行binary_function的reduce操作,因此,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV對。屬於多變少的關係val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6))) a.reduceByKey((x,y) => x + y).collect