1. 程式人生 > >Python技術棧與Spark交叉資料分析雙向整合技術實戰!

Python技術棧與Spark交叉資料分析雙向整合技術實戰!

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。進群:960410445 即可獲取數十套PDF!,如有任何學術交流,可隨時聯絡。

  • Python Spark DataFrame 基礎
df = spark.read.parquet('/sql/users.parquet')
 df.show()
 
 +------+--------------+----------------+
 | name|favorite_color|favorite_numbers|
 +------+--------------+----------------+
 |Alyssa| null| [3, 9, 15, 20]|
 | Ben| red| []|
 +------+--------------+----------------+
複製程式碼
  • Python Spark DataFrame 聚合統計
CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)
 0001,Male,19,15,39
 0002,Male,21,15,81
 0003,Female,20,16,6
 0004,Female,23,16,77
 0005,Female,31,17,40
 0006,Female,22,17,76
 
 df = spark.read.csv('/sql/customers.csv',header=True)
 df.printSchema()
 df.show()
 
 root
 |-- CustomerID: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Annual Income (k$): string (nullable = true)
 |-- Spending Score (1-100): string (nullable = true)
 
 +----------+------+---+------------------+----------------------+
 |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
 +----------+------+---+------------------+----------------------+
 | 0001| Male| 19| 15| 39|
 | 0002| Male| 21| 15| 81|
 | 0003|Female| 20| 16| 6|
 | 0004|Female| 23| 16| 77|
 | 0005|Female| 31| 17| 40|
 | 0006|Female| 22| 17| 76|
 | 0007|Female| 35| 18| 6|
 | 0008|Female| 23| 18| 94|
 | 0009| Male| 64| 19| 3|
 | 0010|Female| 30| 19| 72|
 | 0011| Male| 67| 19| 14|
 | 0012|Female| 35| 19| 99|
 | 0013|Female| 58| 20| 15|
 | 0014|Female| 24| 20| 77|
 | 0015| Male| 37| 20| 13|
 | 0016| Male| 22| 20| 79|
 | 0017|Female| 35| 21| 35|
 | 0018| Male| 20| 21| 66|
 | 0019| Male| 52| 23| 29|
 | 0020|Female| 35| 23| 98|
 +----------+------+---+------------------+----------------------+
 
 df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show()
 
 +---------------------------+-----------------------+--------+
 |avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)|
 +---------------------------+-----------------------+--------+
 | 50.2| 60.56| 70|
 +---------------------------+-----------------------+--------+
複製程式碼
  • alias(alias)為DataFrame定義一個別名,稍後再函式中就可以利用這個別名來做相關的運 算,例如說自關聯Join:
df1 = df.alias('cus1')
 type(df1)
 df2 = df.alias('cus2')
 df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')
 df3.count()
 
 200
 
 +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
 |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
 +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
 | 0001| Male| 19| 15| 39| 0001| Male| 19| 15| 39|
 | 0002| Male| 21| 15| 81| 0002| Male| 21| 15| 81|
 | 0003|Female| 20| 16| 6| 0003|Female| 20| 16| 6|
 | 0004|Female| 23| 16| 77| 0004|Female| 23| 16| 77|
 | 0005|Female| 31| 17| 40| 0005|Female| 31| 17| 40|
 | 0006|Female| 22| 17| 76| 0006|Female| 22| 17| 76|
 | 0007|Female| 35| 18| 6| 0007|Female| 35| 18| 6|
 | 0008|Female| 23| 18| 94| 0008|Female| 23| 18| 94|
 | 0009| Male| 64| 19| 3| 0009| Male| 64| 19| 3|
 | 0010|Female| 30| 19| 72| 0010|Female| 30| 19| 72|
 +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
 only showing top 10 rows
複製程式碼
  • cache(),將DataFrame快取到StorageLevel對應的快取級別中,預設是 MEMORY_AND_DISK
df = spark.read.csv('/sql/customers.csv',header=True)
 a = df.cache()
 a.show()
 
 +----------+------+---+------------------+----------------------+
 |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
 +----------+------+---+------------------+----------------------+
 | 0001| Male| 19| 15| 39|
 | 0002| Male| 21| 15| 81|
 | 0003|Female| 20| 16| 6|
 | 0004|Female| 23| 16| 77|
 | 0005|Female| 31| 17| 40|
 | 0006|Female| 22| 17| 76|
 | 0007|Female| 35| 18| 6|
 | 0008|Female| 23| 18| 94|
 | 0009| Male| 64| 19| 3|
 | 0010|Female| 30| 19| 72|
 | 0011| Male| 67| 19| 14|
 | 0012|Female| 35| 19| 99|
複製程式碼
  • checkpoint(eager=True) 對DataFrame設定斷點,這個方法是Spark2.1引入的方法,這個方法的呼叫會斬斷在這個 DataFrame上的邏輯執行計劃,將前後的依賴關係持久化到checkpoint檔案中去。
sc
 sc.setCheckpointDir('/datas/checkpoint')
 a.checkpoint()
 a.show()
複製程式碼
  • coalesce(numPartitions) 重分割槽演算法,傳入的引數是DataFrame的分割槽數量。
注意通過read方法讀取檔案,建立的DataFrame預設的分割槽數為檔案的個數,即一個檔案對
 應一個分割槽,在分割槽數少於coalesce指定的分割槽數的時候,呼叫coalesce是不起作用的
 
 df = spark.read.csv('/sql/customers.csv',header=True)
 df.rdd.getNumPartitions()
 1
 
 spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions()
 1
 
 df = spark.range(0,20,2,3)
 df.rdd.getNumPartitions()
 df.coalesce(2).rdd.getNumPartitions()
 2
複製程式碼
  • repartition(numPartitions, *cols)這個方法和coalesce(numPartitions) 方法一樣,都是 對DataFrame進行重新的分割槽,但是repartition這個方法會使用hash演算法,在整個叢集中進 行shuffle,效率較低。repartition方法不僅可以指定分割槽數,還可以指定按照哪些列來做分 區。
df = spark.read.csv('/sql/customers.csv',header=True)
 df.rdd.getNumPartitions()
 1
 
 df2 = df.repartition(3)
 df2.rdd.getNumPartitions()
 3
 
 df2.columns
 df3 = df2.repartition(6,'Genre')
 df3.show(20)
 
 +----------+------+---+------------------+----------------------+
 |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
 +----------+------+---+------------------+----------------------+
 | 0003|Female| 20| 16| 6|
 | 0004|Female| 23| 16| 77|
 | 0005|Female| 31| 17| 40|
 | 0006|Female| 22| 17| 76|
 | 0007|Female| 35| 18| 6|
 | 0008|Female| 23| 18| 94|
 | 0010|Female| 30| 19| 72|
 | 0012|Female| 35| 19| 99|
 | 0013|Female| 58| 20| 15|
 
 df3.rdd.getNumPartitions()
 6
複製程式碼
  • colRegex(colName)用正則表示式的方式返回我們想要的列。
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
 df.select(df.colRegex("`(Col1)?+.+`")).show()
 +---+
 | a|
 +---+
 | 1|
 | 2|
 | 3|
 +---+
複製程式碼
  • collect(),返回DataFrame中的所有資料,注意資料量大了容易造成Driver節點記憶體溢 出!
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])
 df.collect()
 [Row(Col1='a', a=1), Row(Col1='b', a=2), Row(Col1='c', a=3)]
複製程式碼
  • columns,以列表的形式返回DataFrame的所有列名
df = spark.read.csv('/sql/customers.csv',header=True)
 df.columns
 
 df = spark.read.csv('/sql/customers.csv',header=True)
 df.columns
 ['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']
複製程式碼
  • SparkSQL DataFrame 轉換為 PandasDataFrame
df = spark.read.csv('/sql/customers.csv',header=True)
 pdf = df.toPandas()
 pdf.info()
 
 <class 'pandas.core.frame.DataFrame'>
 RangeIndex: 200 entries, 0 to 199
 Data columns (total 5 columns):
 CustomerID 200 non-null object
 Genre 200 non-null object
 Age 200 non-null object
 Annual Income (k$) 200 non-null object
 Spending Score (1-100) 200 non-null object
 dtypes: object(5)
 memory usage: 7.9+ KB
 
 pdf['Age'] = pdf['Age'].astype('int')
 pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int')
 pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int')
 pdf.info()
 
 <class 'pandas.core.frame.DataFrame'>
 RangeIndex: 200 entries, 0 to 199
 Data columns (total 5 columns):
 CustomerID 200 non-null object
 Genre 200 non-null object
 Age 200 non-null int64
 Annual Income (k$) 200 non-null int64
 Spending Score (1-100) 200 non-null int64
 dtypes: int64(3), object(2)
 memory usage: 7.9+ KB
複製程式碼
  • PandasDataFrame 轉換為 SparkSQL DataFrame
df1 = spark.createDataFrame(pdf)
 df1.corr("Age","Annual Income (k$)")
 df1.corr("Spending Score (1-100)","Annual Income (k$)")
 
 0.009902848094037492
複製程式碼
  • count()返回DataFrame中Row的數量
df = spark.read.csv('/sql/customers.csv',header=True)
 df.count()
 
 200
複製程式碼
  • createGlobalTempView(name)使用DataFrame建立一個全域性的臨時表,其生命週期 和啟動的app的週期一致,即啟動的spark應用存在則這個臨時的表就一直能訪問。直到 sparkcontext的stop方法的呼叫退出應用為止。建立的臨時表儲存在global_temp這個庫 中