1. 程式人生 > >sparksql json 合並json數據

sparksql json 合並json數據

array code err save arrays bject idf cor als

java

 1 public class Demo {
 2     private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local");
 3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
 4     private static SparkSession session = new SparkSession(jsc.sc());
 5 
 6     public static void main(String[] args) {
7 8 // 加載students.json name,score 9 Dataset<Row> score = session.read().json("./src/main/java/cn/tele/spark_sql/json/students.json"); 10 11 score.createOrReplaceTempView("scoreView"); 12 13 // name,score 14 JavaRDD<Row> scoreRDD = session.sql("select * from scoreView where score > 80").javaRDD();
15 16 // 創建信息json name,age 17 JavaRDD<String> infoRDD = jsc.parallelize(Arrays.asList("{\"name\":\"Leo\",\"age\":18}", 18 "{\"name\":\"Marry\",\"age\":19}", "{\"name\":\"Jack\",\"age\":20}")); 19 20 Dataset<Row> info = session.read().json(infoRDD); 21
info.createOrReplaceTempView("infoView"); 22 23 // 拼接sql 24 List<Row> scoreList = scoreRDD.collect(); 25 26 String sql = "select * from infoView where name in ("; 27 for (int i = 0; i < scoreList.size(); i++) { 28 sql += "‘" + scoreList.get(i).getAs("name") + "‘"; 29 if (i < scoreList.size() - 1) { 30 sql += ","; 31 } 32 } 33 34 sql += ")"; 35 36 // 查詢 分數>80的學生的name,age 37 38 // 轉換 39 JavaPairRDD<String, Integer> tempRDD = session.sql(sql).javaRDD() 40 .mapToPair(new PairFunction<Row, String, Integer>() { 41 42 private static final long serialVersionUID = 1L; 43 44 @Override 45 public Tuple2<String, Integer> call(Row t) throws Exception { 46 return new Tuple2<String, Integer>(t.getAs("name"), Integer.valueOf(t.getAs("age").toString())); 47 } 48 }); 49 50 JavaPairRDD<String, Integer> scoreRDD2 = scoreRDD.mapToPair(new PairFunction<Row, String, Integer>() { 51 52 private static final long serialVersionUID = 1L; 53 54 @Override 55 public Tuple2<String, Integer> call(Row t) throws Exception { 56 return new Tuple2<String, Integer>(t.getAs("name"), Integer.valueOf(t.getAs("score").toString())); 57 } 58 }); 59 60 // join 61 JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = tempRDD.join(scoreRDD2); 62 63 // 遍歷 64 resultRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() { 65 66 private static final long serialVersionUID = 1L; 67 68 @Override 69 public void call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception { 70 System.out.println("name:" + t._1 + "," + "age:" + t._2._1 + ",score:" + t._2._2); 71 } 72 }); 73 74 // 保存為json格式 75 StructType schema = DataTypes 76 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 77 DataTypes.createStructField("age", DataTypes.IntegerType, false), 78 DataTypes.createStructField("score", DataTypes.IntegerType, false))); 79 80 JavaRDD<Row> rowRDD = resultRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() { 81 82 private static final long serialVersionUID = 1L; 83 84 @Override 85 public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception { 86 return RowFactory.create(v1._1, Integer.valueOf(v1._2._1), Integer.valueOf(v1._2._2)); 87 } 88 }); 89 90 Dataset<Row> resultDS = session.createDataFrame(rowRDD, schema); 91 92 resultDS.write().format("json").mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/json/result"); 93 94 session.stop(); 95 jsc.close(); 96 } 97 }

scala

 1 object Demo {
 2   def main(args: Array[String]): Unit = {
 3     val conf = new SparkConf().setAppName("demo").setMaster("local")
 4     val sc = new SparkContext(conf)
 5     val sqlContext = new SQLContext(sc)
 6 
 7     //加載score 信息
 8     val scoreDF = sqlContext.read.json("./src/main/scala/cn/tele/spark_sql/json/students.json")
 9 
10     scoreDF.createOrReplaceTempView("scoreView")
11 
12     val arr = sqlContext.sql("select * from scoreView where score > 80").rdd.collect()
13 
14     //創建 學生信息
15     val infoRDD = sc.parallelize(Array(
16       "{\"name\":\"Leo\",\"age\":20}",
17       "{\"name\":\"Marry\",\"age\":30}",
18       "{\"name\":\"Jack\",\"age\":21}"), 2)
19 
20     val infoDS = sqlContext.read.json(infoRDD)
21 
22     infoDS.createOrReplaceTempView("infoView")
23 
24     var sql = "select * from infoView where name in ("
25     //拼接sql
26     for (i <- 0 to arr.length - 1) {
27       sql += "‘" + arr(i).getAs[String]("name") + "‘"
28       if (i < arr.length - 1) {
29         sql += ","
30       }
31     }
32 
33     sql += ")"
34 
35     val tempRDD = sqlContext.sql(sql).rdd.map(row => {
36       (row.getAs[String]("name"), row.getAs[Long]("age").toInt)
37     })
38 
39     val tempRDD2 = scoreDF.rdd.map(row => {
40       (row.getAs[String]("name"), row.getAs[Long]("score").toInt)
41     })
42 
43     //join
44     val resultRDD = tempRDD.join(tempRDD2)
45 
46     //遍歷
47     resultRDD.foreach(t => {
48       println("name:" + t._1 + "age:" + t._2._1 + "score:" + t._2._2)
49     })
50 
51     val rowRDD = resultRDD.map(t => Row(t._1, t._2._1, t._2._2))
52 
53     //保存為json文件
54     val schema = DataTypes.createStructType(Array(
55       StructField("name", DataTypes.StringType, false),
56       StructField("age", DataTypes.IntegerType, false),
57       StructField("score", DataTypes.IntegerType, false)))
58 
59     val df = sqlContext.createDataFrame(rowRDD, schema)
60 
61     df.write.format("json").mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/json/result")
62   }
63 }

sparksql json 合並json數據