1. 程式人生 > >Spark中json字串和DataFrame相互轉換

Spark中json字串和DataFrame相互轉換

本文介紹基於Spark(2.0+)的Json字串和DataFrame相互轉換。

json字串轉DataFrame

spark提供了將json字串解析為DF的介面,如果不指定生成的DF的schema,預設spark會先掃碼一遍給的json字串,然後推斷生成DF的schema:

  • 若列資料全為null會用String型別
  • 整數預設會用Long型別
  • 浮點數預設會用Double型別
val json1 = """{"a":null, "b": 23.1, "c": 1}"""
val json2 = """{"a":null, "b": "hello", "d": 1.2}"""

val
ds = spark.createDataset(Seq(json1, json2)) val df = spark.read.json(ds) df.show df.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string (nullable = true) |-- b: string (nullable = true
) |-- c: long (nullable = true) |-- d: double (nullable = true)

若指定schema會按照schema生成DF:

  • schema中不存在的列會被忽略
  • 可以用兩種方法指定schema,StructType和String,具體對應關係看後面
  • 若資料無法匹配schema中型別:若schema中列允許為null會轉為null;若不允許為null會轉為相應型別的空值(如Double型別為0.0值),若無法轉換為值會丟擲異常
val schema = StructType(List(
        StructField("a", ByteType, true
), StructField("b", FloatType, false), StructField("c", ShortType, true) )) //或 val schema = "b float, c short" val df = spark.read.schema(schema).json(ds) df.show df.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null| 0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float (nullable = true) |-- c: short (nullable = true)

json解析相關配置引數

primitivesAsString (default false): 把所有列看作string型別
prefersDecimal(default false): 將小數看作decimal,如果不匹配decimal,就看做doubles.
allowComments (default false): 忽略json字串中Java/C++風格的註釋
allowUnquotedFieldNames (default false): 允許不加引號的列名
allowSingleQuotes (default true): 除雙引號外,還允許用單引號
allowNumericLeadingZeros (default false): 允許數字中額外的前導0(如0012)
allowBackslashEscapingAnyCharacter (default false): 允許反斜槓機制接受所有字元
allowUnquotedControlChars (default false): 允許JSON字串包含未加引號的控制字元(值小於32的ASCII字元,包括製表符和換行字元)。

mode (default PERMISSIVE): 允許在解析期間處理損壞記錄的模式。

PERMISSIVE :當遇到損壞的記錄時,將其他欄位設定為null,並將格式錯誤的字串放入由columnNameOfCorruptRecord配置的欄位中。若指定schema,在schema中設定名為columnNameOfCorruptRecord的字串型別欄位。 如果schema中不具有該欄位,則會在分析過程中刪除損壞的記錄。若不指定schema(推斷模式),它會在輸出模式中隱式新增一個columnNameOfCorruptRecord欄位。
DROPMALFORMED : 忽略整條損害記錄
FAILFAST : 遇到損壞記錄throws an exception
columnNameOfCorruptRecord (預設值為spark.sql.columnNameOfCorruptRecord的值):允許PERMISSIVE mode新增的新欄位,會重寫spark.sql.columnNameOfCorruptRecord

dateFormat (default yyyy-MM-dd): 自定義日期格式,遵循java.text.SimpleDateFormat格式. 只有日期部分(無詳細時間)
timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX): 自定義日期格式,遵循java.text.SimpleDateFormat格式. 可以有詳細時間部分(到微秒)
multiLine (default false): 解析一個記錄,該記錄可能跨越多行,每個檔案

以上引數可用option方法配置:

val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show
stringDF.printSchema

+----+-----+----+----+
|   a|    b|   c|   d|
+----+-----+----+----+
|null| 23.1|   1|null|
|null|hello|null| 1.2|
+----+-----+----+----+

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)

二進位制型別會自動用base64編碼方式表示

‘Man’(ascci) base64編碼後為:”TWFu”


val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte)
val binaryDs = spark.createDataset(Seq(byteArr))
val dsWithB64 = binaryDs.withColumn("b64", base64(col("value")))

dsWithB64.show(false)
dsWithB64.printSchema

+----------+----+
|value     |b64 |
+----------+----+
|[4D 61 6E]|TWFu|
+----------+----+

root
 |-- value: binary (nullable = true)
 |-- b64: string (nullable = true)

//=================================================

dsWithB64.toJSON.show(false)
+-----------------------------+
|value                        |
+-----------------------------+
|{"value":"TWFu","b64":"TWFu"}|
+-----------------------------+

//=================================================

val json = """{"value":"TWFu"}"""
val jsonDs = spark.createDataset(Seq(json))
val binaryDF = spark.read.schema("value binary").json(jsonDs )

binaryDF.show
binaryDF.printSchema

+----------+
|     value|
+----------+
|[4D 61 6E]|
+----------+

root
 |-- value: binary (nullable = true)

指定schema示例:

以下是Spark SQL支援的所有基本型別:

val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3, "floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23, "binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12 11:22:22.123123"}"""
val ds = spark.createDataset(Seq(json))
val schema = "stringc string, shortc short, integerc int, longc long, floatc float, doublec double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary, datec date, timestampc timestamp"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc   |datec     |timestampc             |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc    |1     |null    |3    |4.5   |6.7    |8.900   |true    |23   |[4D 61 6E]|2010-01-01|2012-12-12 11:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

root
 |-- stringc: string (nullable = true)
 |-- shortc: short (nullable = true)
 |-- integerc: integer (nullable = true)
 |-- longc: long (nullable = true)
 |-- floatc: float (nullable = true)
 |-- doublec: double (nullable = true)
 |-- decimalc: decimal(10,3) (nullable = true)
 |-- booleanc: boolean (nullable = true)
 |-- bytec: byte (nullable = true)
 |-- binaryc: binary (nullable = true)
 |-- datec: date (nullable = true)
 |-- timestampc: timestamp (nullable = true)

複合型別:

val json = """
{
  "arrayc" : [ 1, 2, 3 ],
  "structc" : {
    "strc" : "efg",
    "decimalc" : 1.1
  },
  "mapc" : {
    "key1" : 1.2,
    "key2" : 1.1
  }
}
"""
val ds = spark.createDataset(Seq(json))
val schema = "arrayc array<short>, structc struct<strc:string, decimalc:decimal>, mapc map<string, float>"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema

+---------+--------+--------------------------+
|arrayc   |structc |mapc                      |
+---------+--------+--------------------------+
|[1, 2, 3]|[efg, 1]|[key1 -> 1.2, key2 -> 1.1]|
+---------+--------+--------------------------+

root
 |-- arrayc: array (nullable = true)
 |    |-- element: short (containsNull = true)
 |-- structc: struct (nullable = true)
 |    |-- strc: string (nullable = true)
 |    |-- decimalc: decimal(10,0) (nullable = true)
 |-- mapc: map (nullable = true)
 |    |-- key: string
 |    |-- value: float (valueContainsNull = true)

SparkSQL資料型別

基本型別:

DataType simpleString typeName sql defaultSize catalogString json
StringType string string STRING 20 string “string”
ShortType smallint short SMALLINT 2 smallint “short”
IntegerType int integer INT 4 int “integer”
LongType bigint long BIGINT 8 bigint “long”
FloatType float float FLOAT 4 float “float”
DoubleType double double DOUBLE 8 double “double”
DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3) “decimal(10,3)”
BooleanType boolean boolean BOOLEAN 1 boolean “boolean”
ByteType tinyint byte TINYINT 1 tinyint “byte”
BinaryType binary binary BINARY 100 binary “binary”
DateType date date DATE 4 date “date”
TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”

三個複合型別:

DataType simpleString typeName sql defaultSize catalogString json
ArrayType(IntegerType, true) array<int> array ARRAY<INT> 4 array<int> {“type”:”array”,”elementType”:”integer”,”containsNull”:true}
MapType(StringType, LongType, true) map<string,bigint> map MAP<STRING, BIGINT> 28 map<string,bigint> {“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true}
StructType(StructField(“sf”, DoubleType)::Nil) struct<sf:double> struct STRUCT<`sf`: DOUBLE> 8 struct<sf:double> {“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}}]}