1. 程式人生 > >SparkSQL建立RDD:UDF(UserDataFrame)使用者建立自定義函式(包含動態建立schema,使用者自定義函式,查詢字元的個數)【Java版純程式碼】

SparkSQL建立RDD:UDF(UserDataFrame)使用者建立自定義函式(包含動態建立schema,使用者自定義函式,查詢字元的個數)【Java版純程式碼】

Java版程式碼: 

package com.bjsxt;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * 使用者自定義函式
 * @author Administrator
 *
 */

public class UDF {
    public static void main(String[] args) {
        SparkConf conf=new SparkConf().setAppName("udf").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        SQLContext sqlContext=new SQLContext(sc);
        JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu","maliu"));
        JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

            @Override
            public Row call(String s) throws Exception {
                
                return RowFactory.create(s);
            }
        });
        
        /**
         * 動態建立schema的方式
         */
        List<StructField> fields=new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
        StructType schema = DataTypes.createStructType(fields);
        DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
        df.registerTempTable("user");
        /**
         * 根據函式的個數來決定實現那個UDF,UDF1,UDF2...
         */
        sqlContext.udf().register("StrLen", new UDF1<String,Integer>(){

            @Override
            public Integer call(String t1) throws Exception {
                
                return t1.length();
            }            
        },DataTypes.IntegerType);
        sqlContext.sql("select name,StrLen(name) as Length from user").show();
        System.out.println("********************************************8");
        sqlContext.udf().register("StrLen",new UDF2<String,Integer,Integer>(){

            @Override
            public Integer call(String t1, Integer t2) throws Exception {
                
                return t1.length()+t2;
            }
            
        }, DataTypes.IntegerType);
        sqlContext.sql("select name , StrLen(name,10) as length from user").show();
        sc.stop();        
    }
}

Scala版程式碼:

val conf = new SparkConf()
conf.setMaster("local").setAppName("udf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);
val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu"))
val rowRDD = rdd.map { x => {
  RowFactory.create(x)
} }
val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.registerTempTable("user")
//sqlContext.udf.register("StrLen",(s : String)=>{s.length()})
//sqlContext.sql("select name ,StrLen(name) as length from user").show
sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length()+i})
sqlContext.sql("select name ,StrLen(name,10) as length from user").show
sc.stop()

親,你終於看完了,那就來鼓勵一下我唄,謝謝你。