SparkSQL建立RDD:UDF(UserDataFrame)使用者建立自定義函式(包含動態建立schema,使用者自定義函式,查詢字元的個數)【Java版純程式碼】
阿新 • • 發佈:2019-01-08
Java版程式碼:
package com.bjsxt; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * 使用者自定義函式 * @author Administrator * */ public class UDF { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("udf").setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); SQLContext sqlContext=new SQLContext(sc); JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu","maliu")); JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() { @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); /** * 動態建立schema的方式 */ List<StructField> fields=new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name",DataTypes.StringType,true)); StructType schema = DataTypes.createStructType(fields); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 根據函式的個數來決定實現那個UDF,UDF1,UDF2... */ sqlContext.udf().register("StrLen", new UDF1<String,Integer>(){ @Override public Integer call(String t1) throws Exception { return t1.length(); } },DataTypes.IntegerType); sqlContext.sql("select name,StrLen(name) as Length from user").show(); System.out.println("********************************************8"); sqlContext.udf().register("StrLen",new UDF2<String,Integer,Integer>(){ @Override public Integer call(String t1, Integer t2) throws Exception { return t1.length()+t2; } }, DataTypes.IntegerType); sqlContext.sql("select name , StrLen(name,10) as length from user").show(); sc.stop(); } }
Scala版程式碼:
val conf = new SparkConf() conf.setMaster("local").setAppName("udf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc); val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu")) val rowRDD = rdd.map { x => { RowFactory.create(x) } } val schema = DataTypes.createStructType(Array(StructField("name",StringType,true))) val df = sqlContext.createDataFrame(rowRDD, schema) df.registerTempTable("user") //sqlContext.udf.register("StrLen",(s : String)=>{s.length()}) //sqlContext.sql("select name ,StrLen(name) as length from user").show sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length()+i}) sqlContext.sql("select name ,StrLen(name,10) as length from user").show sc.stop()
親,你終於看完了,那就來鼓勵一下我唄,謝謝你。