1. 程式人生 > >Spark Sql 程式設計式結構DataType轉換 程式碼類小結

Spark Sql 程式設計式結構DataType轉換 程式碼類小結

基本型別資料轉DataType

public DataType attrToDataType(String attrstr){
        DataType returnDataType = DataTypes.StringType;

        switch(attrstr.toLowerCase()){
        case "short" :
            returnDataType = DataTypes.StringType;
            break;
        case "char" :
            returnDataType = DataTypes.StringType;
            break
; case "int" : returnDataType = DataTypes.IntegerType; break; case "integer" : returnDataType = DataTypes.IntegerType; break; case "float" : returnDataType = DataTypes.FloatType; break; case "double"
: returnDataType = DataTypes.DoubleType; break; case "string" : returnDataType = DataTypes.StringType; break; case "varchar": returnDataType = DataTypes.StringType; break; case "long": returnDataType = DataTypes.LongType; break
; case "boolean": returnDataType = DataTypes.BooleanType; break; case "date": returnDataType = DataTypes.DateType; break; } return returnDataType; }

基本型別轉封裝類

    /**
     * @param fields field name
     * @param attrstr field attribute
     * @return an object -- String,Integer,Long and etc
     */
    public Object arrtToObject(String fields,String attrstr){
        Object fieldObject = fields;
        switch(attrstr.toLowerCase()){
        case "short" :
            fieldObject = Short.parseShort(fields);
            break;      
        case "char" :
            fieldObject = fields;
            break;
        case "int" :
            fieldObject = Integer.parseInt(fields);
            break;
        case "integer" :
            fieldObject = Integer.parseInt(fields);
            break;
        case "float" :
            fieldObject = Float.parseFloat(fields);
            break;
        case "double" :
            fieldObject = Double.parseDouble(fields);
            break;
        case "long":
            fieldObject = Long.parseLong(fields);
            break;
        case "boolean":
            fieldObject = Boolean.parseBoolean(fields);
            break;
        case "date":
            fieldObject = Date.parse(fields);
            break;
        }
        return fieldObject;
    }

當欄位列表和屬性列表都生成後,用for迴圈去createStructField,欄位和屬性要對應上

   for(int i=0;i<schemaList.length;i++){
            fields.add(DataTypes.createStructField(schemaList[i], dataTypesList[i], true));
        }
        StructType schema = DataTypes.createStructType(fields);

生成rowRDD的時候,欄位和屬性也要對應上

JavaRDD<Row> rowRDD = people.map(
          new Function<String, Row>() {
            public Row call(String record) throws Exception {
              String[] fields = record.split(FieldSplitSign);
              //TODO maybe cause the memory leak
              Object[] objeck = new Object[fields.length];
//              System.out.println("field length:" + fields.length);
                for(int i=0;i<fields.length;i++){
//                  System.out.println("fields[i].trim()"+fields[i].trim());
                    objeck[i] = dataTypUtils.arrtToObject(fields[i].trim(),fieldAttyList[i].trim());
              }
              return RowFactory.create(objeck);
            }
          });