1. 程式人生 > >spark讀取mongodb並解決迴圈巢狀array的拆分,屬性不存在整個物件丟失問題。

spark讀取mongodb並解決迴圈巢狀array的拆分,屬性不存在整個物件丟失問題。

1、建立SQLContext

SQLContext sqlContext = new SQLContext(sc)

2、拼接mongodb連線字串

if(UserName!=null && !"".equals(UserName)){
            if(PassWord!=null && !"".equals(PassWord)){
                url="mongodb://"+UserName+":"+PassWord+"@"+IP+":"+Port+"/"+DataBase+"."+Table;
            }else
{ url="mongodb://"+IP+":"+Port+"/"+DataBase+"."+Table; } }else{ url="mongodb://"+IP+":"+Port+"/"+DataBase+"."+Table; }

3、//定義並組裝連線條件

Map<String,String> options=new HashMap<String,String>();
        options.put("spark.mongodb.input.uri", url);
        options.put("spark.mongodb.input.partitioner"
, "MongoPaginateBySizePartitioner"); options.put("spark.mongodb.input.partitionerOptions.partitionKey" , "_id"); options.put("spark.mongodb.input.partitionerOptions.partitionSizeMB",Partition);

4、讀取所有資料資料

DataFrame df = sqlContext.read().format("com.mongodb.spark.sql").options( options).load
(); df.printSchema();

5、迴圈schema並拆分資料。

for(StructField sf:df.schema().fields()){
            this.li.add(sf.name());
             sname=StringUtils.join(li.toArray(),"_");
            if(sf.dataType().typeName()=="array"||"array".equals(sf.dataType().typeName())){

                ArrayType at=(ArrayType)sf.dataType();
                df=df.withColumn(sname, functions.explode(functions.when(df.col(sname).isNull(), functions.array(functions.lit(null).cast(at.elementType())))
                                                              .when(functions.size(df.col(sname)).equalTo(0), functions.array(functions.lit(null).cast(at.elementType())))
                                                              .otherwise(df.col(sname))));
                df=array_loop((ArrayType)sf.dataType(),df);
            }else if(sf.dataType().typeName()=="struct"||"struct".equals(sf.dataType().typeName())){
                df=struct_loop((StructType)sf.dataType(),df);
                df=df.drop(sname);
            }
            cols.add(StringUtils.join(li.toArray(),"."));
            this.li.remove(sf.name());
        }
public DataFrame array_loop(ArrayType arr,DataFrame df){
        if(arr.elementType().typeName()=="struct"||"struct".equals(arr.elementType().typeName())){
            df=struct_loop((StructType)arr.elementType(),df);
        } 
        df=df.drop(sname);
        return df;
    }
    public DataFrame struct_loop(StructType st,DataFrame df){
        for(StructField sf:st.fields()){
            this.li.add(sf.name());
            String sname=StringUtils.join(li.toArray(),"_");
            String sname1=StringUtils.join(li.toArray(),".");
            if(sf.dataType().typeName()=="array"||"array".equals(sf.dataType().typeName())){

                ArrayType at=(ArrayType)sf.dataType();
                df=df.withColumn(sname, functions.explode(functions.when(df.col(sname).isNull(), functions.array(functions.lit(null).cast(at.elementType())))
                                                              .when(functions.size(df.col(sname)).equalTo(0), functions.array(functions.lit(null).cast(at.elementType())))
                                                              .otherwise(df.col(sname))));
                df=array_loop((ArrayType)sf.dataType(),df);
            }else if(sf.dataType().typeName()=="struct"||"struct".equals(sf.dataType().typeName())){
                df=struct_loop((StructType)sf.dataType(),df);
            }
            df=df.selectExpr(sname1+" as "+sname,"*");
            cols.add(StringUtils.join(li.toArray(),"."));
            this.li.remove(sf.name());
        }
        return df;
    }