Python技術棧與Spark交叉資料分析雙向整合技術實戰--大資料ML樣本集案例實戰
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。
-
df = spark.read.parquet('/sql/users.parquet') df.show() +------+--------------+----------------+ |name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa|null|[3, 9, 15, 20]| |Ben|red|[]| +------+--------------+----------------+ 複製程式碼
-
Python Spark DataFrame 聚合統計
CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100) 0001,Male,19,15,39 0002,Male,21,15,81 0003,Female,20,16,6 0004,Female,23,16,77 0005,Female,31,17,40 0006,Female,22,17,76 df = spark.read.csv('/sql/customers.csv',header=True) df.printSchema() df.show() root |-- CustomerID: string (nullable = true) |-- Genre: string (nullable = true) |-- Age: string (nullable = true) |-- Annual Income (k$): string (nullable = true) |-- Spending Score (1-100): string (nullable = true) +----------+------+---+------------------+----------------------+ |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)| +----------+------+---+------------------+----------------------+ |0001|Male| 19|15|39| |0002|Male| 21|15|81| |0003|Female| 20|16|6| |0004|Female| 23|16|77| |0005|Female| 31|17|40| |0006|Female| 22|17|76| |0007|Female| 35|18|6| |0008|Female| 23|18|94| |0009|Male| 64|19|3| |0010|Female| 30|19|72| |0011|Male| 67|19|14| |0012|Female| 35|19|99| |0013|Female| 58|20|15| |0014|Female| 24|20|77| |0015|Male| 37|20|13| |0016|Male| 22|20|79| |0017|Female| 35|21|35| |0018|Male| 20|21|66| |0019|Male| 52|23|29| |0020|Female| 35|23|98| +----------+------+---+------------------+----------------------+ df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show() +---------------------------+-----------------------+--------+ |avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)| +---------------------------+-----------------------+--------+ |50.2|60.56|70| +---------------------------+-----------------------+--------+ 複製程式碼
-
alias(alias)為DataFrame定義一個別名,稍後再函式中就可以利用這個別名來做相關的運 算,例如說自關聯Join:
df1 = df.alias('cus1') type(df1) df2 = df.alias('cus2') df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner') df3.count() 200 +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+ |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)| +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+ |0001|Male| 19|15|39|0001|Male| 19|15|39| |0002|Male| 21|15|81|0002|Male| 21|15|81| |0003|Female| 20|16|6|0003|Female| 20|16|6| |0004|Female| 23|16|77|0004|Female| 23|16|77| |0005|Female| 31|17|40|0005|Female| 31|17|40| |0006|Female| 22|17|76|0006|Female| 22|17|76| |0007|Female| 35|18|6|0007|Female| 35|18|6| |0008|Female| 23|18|94|0008|Female| 23|18|94| |0009|Male| 64|19|3|0009|Male| 64|19|3| |0010|Female| 30|19|72|0010|Female| 30|19|72| +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+ only showing top 10 rows 複製程式碼
-
cache(),將DataFrame快取到StorageLevel對應的快取級別中,預設是 MEMORY_AND_DISK
df = spark.read.csv('/sql/customers.csv',header=True) a = df.cache() a.show() +----------+------+---+------------------+----------------------+ |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)| +----------+------+---+------------------+----------------------+ |0001|Male| 19|15|39| |0002|Male| 21|15|81| |0003|Female| 20|16|6| |0004|Female| 23|16|77| |0005|Female| 31|17|40| |0006|Female| 22|17|76| |0007|Female| 35|18|6| |0008|Female| 23|18|94| |0009|Male| 64|19|3| |0010|Female| 30|19|72| |0011|Male| 67|19|14| |0012|Female| 35|19|99| 複製程式碼
-
checkpoint(eager=True) 對DataFrame設定斷點,這個方法是Spark2.1引入的方法,這個方法的呼叫會斬斷在這個 DataFrame上的邏輯執行計劃,將前後的依賴關係持久化到checkpoint檔案中去。
sc sc.setCheckpointDir('/datas/checkpoint') a.checkpoint() a.show() 複製程式碼
-
coalesce(numPartitions) 重分割槽演算法,傳入的引數是DataFrame的分割槽數量。
注意通過read方法讀取檔案,建立的DataFrame預設的分割槽數為檔案的個數,即一個檔案對 應一個分割槽,在分割槽數少於coalesce指定的分割槽數的時候,呼叫coalesce是不起作用的 df = spark.read.csv('/sql/customers.csv',header=True) df.rdd.getNumPartitions() 1 spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions() 1 df = spark.range(0,20,2,3) df.rdd.getNumPartitions() df.coalesce(2).rdd.getNumPartitions() 2 複製程式碼
-
repartition(numPartitions, *cols)這個方法和coalesce(numPartitions) 方法一樣,都是 對DataFrame進行重新的分割槽,但是repartition這個方法會使用hash演算法,在整個叢集中進 行shuffle,效率較低。repartition方法不僅可以指定分割槽數,還可以指定按照哪些列來做分 區。
df = spark.read.csv('/sql/customers.csv',header=True) df.rdd.getNumPartitions() 1 df2 = df.repartition(3) df2.rdd.getNumPartitions() 3 df2.columns df3 = df2.repartition(6,'Genre') df3.show(20) +----------+------+---+------------------+----------------------+ |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)| +----------+------+---+------------------+----------------------+ |0003|Female| 20|16|6| |0004|Female| 23|16|77| |0005|Female| 31|17|40| |0006|Female| 22|17|76| |0007|Female| 35|18|6| |0008|Female| 23|18|94| |0010|Female| 30|19|72| |0012|Female| 35|19|99| |0013|Female| 58|20|15| df3.rdd.getNumPartitions() 6 複製程式碼
-
colRegex(colName)用正則表示式的方式返回我們想要的列。
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"]) df.select(df.colRegex("`(Col1)?+.+`")).show() +---+ |a| +---+ |1| |2| |3| +---+ 複製程式碼
-
collect(),返回DataFrame中的所有資料,注意資料量大了容易造成Driver節點記憶體溢 出!
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"]) df.collect() [Row(Col1='a', a=1), Row(Col1='b', a=2), Row(Col1='c', a=3)] 複製程式碼
-
columns,以列表的形式返回DataFrame的所有列名
df = spark.read.csv('/sql/customers.csv',header=True) df.columns df = spark.read.csv('/sql/customers.csv',header=True) df.columns ['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)'] 複製程式碼
-
SparkSQL DataFrame 轉換為 PandasDataFrame
df = spark.read.csv('/sql/customers.csv',header=True) pdf = df.toPandas() pdf.info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 200 entries, 0 to 199 Data columns (total 5 columns): CustomerID200 non-null object Genre200 non-null object Age200 non-null object Annual Income (k$)200 non-null object Spending Score (1-100)200 non-null object dtypes: object(5) memory usage: 7.9+ KB pdf['Age'] = pdf['Age'].astype('int') pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int') pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int') pdf.info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 200 entries, 0 to 199 Data columns (total 5 columns): CustomerID200 non-null object Genre200 non-null object Age200 non-null int64 Annual Income (k$)200 non-null int64 Spending Score (1-100)200 non-null int64 dtypes: int64(3), object(2) memory usage: 7.9+ KB 複製程式碼
-
PandasDataFrame 轉換為 SparkSQL DataFrame
df1 = spark.createDataFrame(pdf) df1.corr("Age","Annual Income (k$)") df1.corr("Spending Score (1-100)","Annual Income (k$)") 0.009902848094037492 複製程式碼
-
count()返回DataFrame中Row的數量
df = spark.read.csv('/sql/customers.csv',header=True) df.count() 200 複製程式碼
-
createGlobalTempView(name)使用DataFrame建立一個全域性的臨時表,其生命週期 和啟動的app的週期一致,即啟動的spark應用存在則這個臨時的表就一直能訪問。直到 sparkcontext的stop方法的呼叫退出應用為止。建立的臨時表儲存在global_temp這個庫 中