spark學習記錄(十二、Spark UDF&UDAF&開窗函式)
阿新 • • 發佈:2019-01-13
一、UDF&UDAF
public class JavaExample { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("udf"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("Sam", "Tom", "Jetty", "Tom", "Jetty")); JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() { public Row call(String s) throws Exception { return RowFactory.create(s); } }); /** * 動態建立Schema方式載入DF */ List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sqlContext.createDataFrame(rowRDD, schema); dataFrame.registerTempTable("user"); /** * 根據UDF函式引數的個數決定是實現哪個UDF UDF1,UDF2... * UDF1表示傳一個引數,StrLen(name) */ sqlContext.udf().register("StrLen", new UDF1<String, Integer>() { public Integer call(String s) throws Exception { return s.length(); } }, DataTypes.IntegerType); sqlContext.sql("select name,StrLen(name) as length from user").show(); /** * 註冊一個UDAF函式,實現統計相同的值得個數 */ sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() { /** * 指定輸入欄位的欄位及型別 */ @Override public StructType inputSchema() { return DataTypes.createStructType( Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true))); } /** * 指定UDAF函式計算後返回的結果型別 */ @Override public DataType dataType() { return DataTypes.IntegerType; } /** * 確保一致性一般用true,用以標記針對給定的一組輸入,UDAF總是生成相同的結果 */ @Override public boolean deterministic() { return true; } /** * 可以認為一個一個的將組內的欄位值傳遞出來顯現拼接的邏輯 * buffer.update(0)獲取的是上一次聚合的值 * 相當於map的combiner,combiner就是對每一個map task的處理結果進行一次小聚合 * 大聚合發生在reduce端 * 這裡既是:在進行聚合的時候,每當有新的值進來,對分組後的聚合 */ @Override public void update(MutableAggregationBuffer buffer, Row input) { buffer.update(0,buffer.getInt(0)+1); } /** * 在進行聚合操作的時候所要處理的資料的結果的型別 */ @Override public StructType bufferSchema() { return DataTypes.createStructType( Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true))); } /** * 合併 update操作,可能是針對一個分組內的部分資料,在某個節點上發生的 但是可能一個分組內的資料, * 會分佈在多個節點上處理 * 此時就要用merge操作,將各個節點上分散式拼接好的串,合併起來 * buffer1.getInt(0) : 大聚和的時候 上一次聚合後的值 * buffer2.getInt(0) : 這次計算傳入進來的update的結果 * 這裡即是:最後在分散式節點完成後需要進行全域性級別的Merge操作 */ @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0)); } /** * 初始化一個內部的自己定義的值,在Aggregate之前每組資料的初始化結果 */ @Override public void initialize(MutableAggregationBuffer buffer) { buffer.update(0, 0); } /** * 最後返回一個和DataType的型別要一致的型別,返回UDAF最後的計算結果 */ @Override public Object evaluate(Row buffer) { return buffer.getInt(0); } }); sqlContext.sql("select name ,StringCount(name) as number from user group by name").show(); sc.stop(); } }
二、開窗函式
例:駕駛第一列為日期,第二類為類別,第三類為價格。統計每個類別賺的最多的三次。
資料:https://download.csdn.net/download/qq_33283652/10904792
/** * row_number() 開窗函式是按照某個欄位分組,然後取另一欄位的前幾個的值,相當於 分組取topN * 如果SQL語句裡面使用到了開窗函式,那麼這個SQL語句必須使用HiveContext來執行,HiveContext預設情況下在本地無法建立。 * 開窗函式格式: * row_number() over (partitin by XXX order by XXX desc) */ public class JavaExample { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("windowfun"); conf.set("spark.sql.shuffle.partitions","1"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("use spark"); hiveContext.sql("drop table if exists sales"); hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath '/root/test/sales' into table sales"); /** * 開窗函式格式: * 【 rou_number() over (partitin by XXX order by XXX) 】 */ Dataset<Row> result = hiveContext.sql("select riqi,leibie,jine " + "from (" + "select riqi,leibie,jine," + "row_number() over (partition by leibie order by jine desc) rank " + "from sales) t " + "where t.rank<=3"); result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result"); sc.stop(); } }