Spark Sql 程式設計式結構DataType轉換 程式碼類小結
阿新 • • 發佈:2019-02-18
基本型別資料轉DataType
public DataType attrToDataType(String attrstr){
DataType returnDataType = DataTypes.StringType;
switch(attrstr.toLowerCase()){
case "short" :
returnDataType = DataTypes.StringType;
break;
case "char" :
returnDataType = DataTypes.StringType;
break ;
case "int" :
returnDataType = DataTypes.IntegerType;
break;
case "integer" :
returnDataType = DataTypes.IntegerType;
break;
case "float" :
returnDataType = DataTypes.FloatType;
break;
case "double" :
returnDataType = DataTypes.DoubleType;
break;
case "string" :
returnDataType = DataTypes.StringType;
break;
case "varchar":
returnDataType = DataTypes.StringType;
break;
case "long":
returnDataType = DataTypes.LongType;
break ;
case "boolean":
returnDataType = DataTypes.BooleanType;
break;
case "date":
returnDataType = DataTypes.DateType;
break;
}
return returnDataType;
}
基本型別轉封裝類
/**
* @param fields field name
* @param attrstr field attribute
* @return an object -- String,Integer,Long and etc
*/
public Object arrtToObject(String fields,String attrstr){
Object fieldObject = fields;
switch(attrstr.toLowerCase()){
case "short" :
fieldObject = Short.parseShort(fields);
break;
case "char" :
fieldObject = fields;
break;
case "int" :
fieldObject = Integer.parseInt(fields);
break;
case "integer" :
fieldObject = Integer.parseInt(fields);
break;
case "float" :
fieldObject = Float.parseFloat(fields);
break;
case "double" :
fieldObject = Double.parseDouble(fields);
break;
case "long":
fieldObject = Long.parseLong(fields);
break;
case "boolean":
fieldObject = Boolean.parseBoolean(fields);
break;
case "date":
fieldObject = Date.parse(fields);
break;
}
return fieldObject;
}
當欄位列表和屬性列表都生成後,用for迴圈去createStructField,欄位和屬性要對應上
for(int i=0;i<schemaList.length;i++){
fields.add(DataTypes.createStructField(schemaList[i], dataTypesList[i], true));
}
StructType schema = DataTypes.createStructType(fields);
生成rowRDD的時候,欄位和屬性也要對應上
JavaRDD<Row> rowRDD = people.map(
new Function<String, Row>() {
public Row call(String record) throws Exception {
String[] fields = record.split(FieldSplitSign);
//TODO maybe cause the memory leak
Object[] objeck = new Object[fields.length];
// System.out.println("field length:" + fields.length);
for(int i=0;i<fields.length;i++){
// System.out.println("fields[i].trim()"+fields[i].trim());
objeck[i] = dataTypUtils.arrtToObject(fields[i].trim(),fieldAttyList[i].trim());
}
return RowFactory.create(objeck);
}
});