1. 程式人生 > >spark dataframe操作集錦(提取前幾行,合併,入庫等)

spark dataframe操作集錦(提取前幾行,合併,入庫等)

spark dataframe派生於RDD類,但是提供了非常強大的資料操作功能。當然主要對類SQL的支援。

在實際工作中會遇到這樣的情況,主要是會進行兩個資料集的篩選、合併,重新入庫。

首先載入資料集,然後在提取資料集的前幾行過程中,才找到limit的函式。

而合併就用到union函式,重新入庫,就是registerTemple註冊成表,再進行寫入到HIVE中。

不得不讚嘆dataframe的強大。

具體示例:為了得到樣本均衡的訓練集,需要對兩個資料集中各取相同的訓練樣本數目來組成,因此用到了這個功能。

scala> val fes = hiveContext.sql(sqlss)
fes: org.apache.spark.sql.DataFrame = [caller_num: string, is_sr: int, call_count: int, avg_talk_time: double, max_talk_time: int, min_talk_time: int, called_num_count: int, called_lsd: double, null_called_count: int]


scala> val fcount = fes.count()
fcount: Long = 4371029

scala> val zcfea = hiveContext.sql(sqls2)
zcfea: org.apache.spark.sql.DataFrame = [caller_num: string, is_sr: int, call_count: int, avg_talk_time: double, max_talk_time: int, min_talk_time: int, called_num_count: int, called_lsd: double, null_called_count: int]


scala> val zcount = zcfea.count()
zcount: Long = 14208117


scala> val f01 = fes.limit(25000)
f01: org.apache.spark.sql.DataFrame = [caller_num: string, is_sr: int, call_count: int, avg_talk_time: double, max_talk_time: int, min_talk_time: int, called_num_count: int, called_lsd: double, null_called_count: int]


scala> val f02 = zcfea.limit(25000)
f02: org.apache.spark.sql.DataFrame = [caller_num: string, is_sr: int, call_count: int, avg_talk_time: double, max_talk_time: int, min_talk_time: int, called_num_count: int, called_lsd: double, null_called_count: int]


scala> val ff=f01.unionAll(f02)
ff: org.apache.spark.sql.DataFrame = [caller_num: string, is_sr: int, call_count: int, avg_talk_time: double, max_talk_time: int, min_talk_time: int, called_num_count: int, called_lsd: double, null_called_count: int]


scala> ff.registerTempTable("ftable01")


scala> hiveContext.sql("create table shtrainfeature as select * from ftable01")
res1: org.apache.spark.sql.DataFrame = []

最後附上dataframe的一些操作及用法:

DataFrame 的函式
Action 操作
1、 collect() ,返回值是一個數組,返回dataframe集合所有的行
2、 collectAsList() 返回值是一個java型別的陣列,返回dataframe集合所有的行
3、 count() 返回一個number型別的,返回dataframe集合的行數
4、 describe(cols: String*) 返回一個通過數學計算的類表值(count, mean, stddev, min, and max),這個可以傳多個引數,中間用逗號分隔,如果有欄位為空,那麼不參與運算,只這對數值型別的欄位。例如df.describe("age", "height").show()
5、 first() 返回第一行 ,型別是row型別
6、 head() 返回第一行 ,型別是row型別
7、 head(n:Int)返回n行  ,型別是row 型別
8、 show()返回dataframe集合的值 預設是20行,返回型別是unit
9、 show(n:Int)返回n行,,返回值型別是unit
10、 table(n:Int) 返回n行  ,型別是row 型別

dataframe的基本操作
1、 cache()同步資料的記憶體
2、 columns 返回一個string型別的陣列,返回值是所有列的名字
3、 dtypes返回一個string型別的二維陣列,返回值是所有列的名字以及型別
4、 explan()列印執行計劃  物理的
5、 explain(n:Boolean) 輸入值為 false 或者true ,返回值是unit  預設是false ,如果輸入true 將會列印 邏輯的和物理的
6、 isLocal 返回值是Boolean型別,如果允許模式是local返回true 否則返回false
7、 persist(newlevel:StorageLevel) 返回一個dataframe.this.type 輸入儲存模型型別
8、 printSchema() 打印出欄位名稱和型別 按照樹狀結構來列印
9、 registerTempTable(tablename:String) 返回Unit ,將df的物件只放在一張表裡面,這個表隨著物件的刪除而刪除了
10、 schema 返回structType 型別,將欄位名稱和型別按照結構體型別返回
11、 toDF()返回一個新的dataframe型別的
12、 toDF(colnames:String*)將引數中的幾個欄位返回一個新的dataframe型別的,
13、 unpersist() 返回dataframe.this.type 型別,去除模式中的資料
14、 unpersist(blocking:Boolean)返回dataframe.this.type型別 true 和unpersist是一樣的作用false 是去除RDD

整合查詢:
1、 agg(expers:column*) 返回dataframe型別 ,同數學計算求值
df.agg(max("age"), avg("salary"))
df.groupBy().agg(max("age"), avg("salary"))
2、 agg(exprs: Map[String, String])  返回dataframe型別 ,同數學計算求值 map型別的
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)  返回dataframe型別 ,同數學計算求值
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
4、 apply(colName: String) 返回column型別,捕獲輸入進去列的物件
5、 as(alias: String) 返回一個新的dataframe型別,就是原來的一個別名
6、 col(colName: String)  返回column型別,捕獲輸入進去列的物件
7、 cube(col1: String, cols: String*) 返回一個GroupedData型別,根據某些欄位來彙總
8、 distinct 去重 返回一個dataframe型別
9、 drop(col: Column) 刪除某列 返回dataframe型別
10、 dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個dataframe
11、 except(other: DataFrame) 返回一個dataframe,返回在當前集合存在的在其他集合不存在的
12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe型別,這個 將一個欄位進行更多行的拆分
df.explode("name","names") {name :String=> name.split(" ")}.show();
將name欄位根據空格來拆分,拆分的欄位放在names裡面
13、 filter(conditionExpr: String): 刷選部分資料,返回dataframe型別 df.filter("age>10").show();  df.filter(df("age")>10).show();   df.where(df("age")>10).show(); 都可以
14、 groupBy(col1: String, cols: String*) 根據某寫欄位來彙總返回groupedate型別   df.groupBy("age").agg(Map("age" ->"count")).show();df.groupBy("age").avg().show();都可以
15、 intersect(other: DataFrame) 返回一個dataframe,在2個dataframe都存在的元素
16、 join(right: DataFrame, joinExprs: Column, joinType: String)
一個是關聯的dataframe,第二個關聯的條件,第三個關聯的型別:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
17、 limit(n: Int) 返回dataframe型別  去n 條資料出來
18、 na: DataFrameNaFunctions ,可以呼叫dataframenafunctions的功能區做過濾 df.na.drop().show(); 刪除為空的行
19、 orderBy(sortExprs: Column*) 做alise排序
20、 select(cols:string*) dataframe 做欄位的刷選 df.select($"colA", $"colB" + 1)
21、 selectExpr(exprs: String*) 做欄位的刷選 df.selectExpr("name","name as names","upper(name)","age+1").show();
22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 預設是asc
23、 unionAll(other:Dataframe) 合併 df.unionAll(ds).show();
24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
25、 withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();