1. 程式人生 > >spark:將csv檔案讀取為DataFrame

spark:將csv檔案讀取為DataFrame

以下內容在spark2.2和spark2.3中測試都通過

通用轉換形式:

spark.read.schema(sch).option("header", true).csv("/path/file.csv")

注意以下幾點:

  • csv會完全按照指定的schema結構進行轉換,若不指定schema預設都解析為StringType(若指定了option("inferSchema", true)會遍歷資料推斷型別)。

  • 列的順序和指定schema中列的順序是一致的,這點不像json,json會進行列名對應,但是csv不會,只會根據順序判斷(即使指定了option("header", true)

    也無效,會將header中列名進行覆蓋)。

  • 若schema列數多於原始資料列數,schema最後面多出的列會為null。

  • 若schema列數少於原始資料列數,只會取原始資料中前面的列,原始資料多出的列的資料將被忽略。

看下面的例子:

import org.apache.spark.sql.types._
val sch = StructType(
StructField("col1", LongType)::
StructField("age", StringType)::
Nil
)

val s1 = "\"id\""
val s2 = "\"1\""
val ds = spark.createDataset(Seq(s1, s2))
ds.show(false
) +-----+ |value| +-----+ |"id" | |"1" | +-----+ val df = spark.read.schema(sch).option("header", true).csv(ds) df.show(false) +----+----+ |col1|age | +----+----+ |1 |null| +----+----+ val s1 = "\"id\",\"name\",\"age\",\"text\"" val s2 = "\"1\",\"張三\",\"23\",\"你好\"" val ds = spark.createDataset(Seq(s1, s2)) ds.show(false
) +------------------------+ |value | +------------------------+ |"id","name","age","text"| |"1","張三","23","你好" | +------------------------+ val df = spark.read.schema(sch).option("header", true).csv(ds) df.show(false) +----+---+ |col1|age| +----+---+ |1 |張三| +----+---+