1. 程式人生 > >Apache Spark 2.4 內建的 Avro 資料來源實戰

Apache Spark 2.4 內建的 Avro 資料來源實戰

文章目錄


Apache Avro 是一種流行的資料序列化格式。它廣泛用於 Apache Spark 和 Apache Hadoop 生態系統,尤其適用於基於 Kafka 的資料管道。從 Apache Spark 2.4 版本開始(參見 Apache Spark 2.4 正式釋出,重要功能詳細介紹),Spark 為讀取和寫入 Avro 資料提供內建支援。新的內建 spark-avro 模組最初來自 Databricks 的開源專案
Avro Data Source for Apache Spark
。除此之外,它還提供以下功能:

  • 新函式 from_avro() 和 to_avro() 用於在 DataFrame 中讀取和寫入 Avro 資料,而不僅僅是檔案。

  • 支援 Avro 邏輯型別(logical types),包括 Decimal,Timestamp 和 Date型別。

  • 2倍讀取吞吐量提高和10%寫入吞吐量提升。

本文將通過示例介紹上面的每個功能。

載入和儲存函式

在 Apache Spark 2.4 中,為了讀寫 Avro 格式的資料,你只需在 DataFrameReader 和 DataFrameWriter 中將檔案格式指定為“avro”即可。其用法和其他資料來源用法很類似。如下所示:

val iteblogDF = spark.read.format("avro").load("examples/src/main/resources/iteblog.avro")
iteblogDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

from_avro() 和 to_avro() 的使用

為了進一步簡化資料轉換流程(transformation pipeline),社群引入了兩個新的內建函式:from_avro() 和 to_avro()。Avro 通常用於序列化/反序列化基於 Apache Kafka 的資料管道中的訊息或資料,在讀取或寫入 Kafka 時,將 Avro records 作為列將非常有用。每個 Kafka 鍵值記錄都會增加一些元資料,例如 Kafka 的攝取時間戳,Kafka 的偏移量等。

在以下三種場景,from_avro() 和 to_avro() 函式將非常有用:

  • 當使用 Spark 從 Kafka 中讀取 Avro 格式的資料,可以使用 from_avro() 函式來抽取你要的資料,清理資料並對其進行轉換。
  • 當你想要將 structs 格式的資料轉換為 Avro 二進位制記錄,然後將它們傳送到 Kafka 或寫入到檔案,你可以使用 to_avro()。
  • 如果你需要將多個列重新編碼為單個列,請使用to_avro().

目前這兩個函式僅在 Scala 和 Java 語言中可用。from_avro 和 to_avro 函式的使用除了需要人為指定 Avro schema,其他的和使用 from_json 和 to_json 函式一樣,下面是這兩個函式的使用示例。

在程式碼裡面指定 Avro 模式

import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder
 
val servers = "www.iteblog.com:9092"
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
 
// Convert structured data to binary from string (key column) and
// int (value column) and save them to a Kafka topic.
iteblogDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

通過 Schema Registry 服務提供 Avro 模式

如果我們有 Schema Registry 服務,那麼我們就不需要在程式碼裡面指定 Avro 模式了,如下:

import org.apache.spark.sql.avro._
 
// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://www.iteblog.com"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
 
  // Given that key and value columns are registered in Schema Registry, convert
  // structured data of key and value columns to Avro binary data by reading the schema
  // info from the Schema Registry. The converted data is saved to Kafka as a Kafka topic "t".
  iteblogDF
    .select(
      to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
      to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

通過檔案設定 Avro 模式

我們還可以將 Avro 模式寫入到檔案裡面,然後在程式碼裡面讀取模式檔案:

import org.apache.spark.sql.avro._
 
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
 
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "iteblog1")
  .load()
 
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)
 
val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "iteblog2")
  .start()

與 Databricks spark-avro的相容性

因為 Spark 內建對讀寫 Avro 資料的支援是從 Spark 2.4 才引入的,所以在這些版本之前,可能有使用者已經使用了 Databricks 開源的 spark-avro。但是不用急,內建的 spark-avro 模組和這個是完全相容的。我們僅僅需要將之前引入的 com.databricks.spark.avro 修改成 org.apache.spark.sql.avro._ 即可。

效能測試

基於 SPARK-24800 的優化,內建 Avro 資料來源讀寫 Avro 檔案的效能得到很大提升。社群在這方面進行了相關的基準測試,結果表明,在1百萬行的資料(包含 Int/Double/String/Map/Array/Struct 等各種資料格式)測試中,讀取的效能提升了2倍,寫的效能提升了8%。基準測試的程式碼可參見 這裡,測試比較如下:
在這裡插入圖片描述

結論

內建的 spark-avro 模組為 Spark SQL 和 Structured Streaming 提供了更好的使用者體驗以及 IO 效能。

轉載自過往記憶(https://www.iteblog.com/)
本文連結: 【Apache Spark 2.4 內建的 Avro 資料來源介紹】(https://www.iteblog.com/archives/2476.html)