1. 程式人生 > >spark學習記錄(十二、Spark UDF&UDAF&開窗函式)

spark學習記錄(十二、Spark UDF&UDAF&開窗函式)

一、UDF&UDAF 

public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("udf");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("Sam", "Tom", "Jetty", "Tom", "Jetty"));
        JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
            public Row call(String s) throws Exception {
                return RowFactory.create(s);
            }
        });

        /**
         * 動態建立Schema方式載入DF
         */
        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(fields);

        Dataset<Row> dataFrame = sqlContext.createDataFrame(rowRDD, schema);
        dataFrame.registerTempTable("user");

        /**
         * 根據UDF函式引數的個數決定是實現哪個UDF   UDF1,UDF2...
         * UDF1表示傳一個引數,StrLen(name)
         */
        sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {
            public Integer call(String s) throws Exception {
                return s.length();
            }
        }, DataTypes.IntegerType);

        sqlContext.sql("select name,StrLen(name) as length from user").show();

        /**
         * 註冊一個UDAF函式,實現統計相同的值得個數
         */
        sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
            /**
             * 指定輸入欄位的欄位及型別
             */
            @Override
            public StructType inputSchema() {
                return DataTypes.createStructType(
                        Arrays.asList(DataTypes.createStructField("name",
                                DataTypes.StringType, true)));
            }
            /**
             * 指定UDAF函式計算後返回的結果型別
             */
            @Override
            public DataType dataType() {
                return DataTypes.IntegerType;
            }
            /**
             * 確保一致性一般用true,用以標記針對給定的一組輸入,UDAF總是生成相同的結果
             */
            @Override
            public boolean deterministic() {
                return true;
            }
            /**
             * 可以認為一個一個的將組內的欄位值傳遞出來顯現拼接的邏輯
             * buffer.update(0)獲取的是上一次聚合的值
             * 相當於map的combiner,combiner就是對每一個map task的處理結果進行一次小聚合
             * 大聚合發生在reduce端
             * 這裡既是:在進行聚合的時候,每當有新的值進來,對分組後的聚合
             */
            @Override
            public void update(MutableAggregationBuffer buffer, Row input) {
                buffer.update(0,buffer.getInt(0)+1);
            }
            /**
             * 在進行聚合操作的時候所要處理的資料的結果的型別
             */
            @Override
            public StructType bufferSchema() {
                return DataTypes.createStructType(
                                Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));
            }
            /**
             * 合併 update操作,可能是針對一個分組內的部分資料,在某個節點上發生的 但是可能一個分組內的資料,
             * 會分佈在多個節點上處理
             * 此時就要用merge操作,將各個節點上分散式拼接好的串,合併起來
             * buffer1.getInt(0) : 大聚和的時候 上一次聚合後的值
             * buffer2.getInt(0) : 這次計算傳入進來的update的結果
             * 這裡即是:最後在分散式節點完成後需要進行全域性級別的Merge操作
             */
            @Override
            public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
                buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
            }
            /**
             * 初始化一個內部的自己定義的值,在Aggregate之前每組資料的初始化結果
             */
            @Override
            public void initialize(MutableAggregationBuffer buffer) {
                buffer.update(0, 0);
            }
            /**
             * 最後返回一個和DataType的型別要一致的型別,返回UDAF最後的計算結果
             */
            @Override
            public Object evaluate(Row buffer) {
                return buffer.getInt(0);
            }
        });

        sqlContext.sql("select name ,StringCount(name) as number from user group by name").show();

        sc.stop();
    }
}

二、開窗函式

例:駕駛第一列為日期,第二類為類別,第三類為價格。統計每個類別賺的最多的三次。

資料:https://download.csdn.net/download/qq_33283652/10904792

/**
 * row_number() 開窗函式是按照某個欄位分組,然後取另一欄位的前幾個的值,相當於 分組取topN
 * 如果SQL語句裡面使用到了開窗函式,那麼這個SQL語句必須使用HiveContext來執行,HiveContext預設情況下在本地無法建立。
 * 開窗函式格式:
 * row_number() over (partitin by XXX order by XXX desc)
 */
public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("windowfun");
        conf.set("spark.sql.shuffle.partitions","1");
        JavaSparkContext sc = new JavaSparkContext(conf);
        HiveContext hiveContext = new HiveContext(sc);
        hiveContext.sql("use spark");
        hiveContext.sql("drop table if exists sales");
        hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
                + "row format delimited fields terminated by '\t'");
        hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
        /**
         * 開窗函式格式:
         * 【 rou_number() over (partitin by XXX order by XXX) 】
         */
        Dataset<Row> result = hiveContext.sql("select riqi,leibie,jine "
                + "from ("
                + "select riqi,leibie,jine,"
                + "row_number() over (partition by leibie order by jine desc) rank "
                + "from sales) t "
                + "where t.rank<=3");
        result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
        sc.stop();
    }
}