1. 程式人生 > >spark使用scala讀取Avro資料

spark使用scala讀取Avro資料

為什麼使用 Avro ?

最基本的格式是 CSV ,其廉價並且不需要頂一個一個 schema 和資料關聯。

隨後流行起來的一個通用的格式是 XML,其有一個 schema 和 資料關聯,XML 廣泛的使用於 Web Services 和 SOA 架構中。不幸的是,其非常冗長,並且解析 XML 需要消耗記憶體。

另外一種格式是 JSON,其非常流行易於使用因為它非常方便易於理解。

這些格式在 Big Data 環境中都是不可拆分的,這使得他們難於使用。在他們之上使用一個壓縮機制(Snappy,Gzip)並不能解決這個問題。

因此不同的資料格式出現了。Avro 作為一種序列化平臺被廣泛使用,因為它能跨語言,提供了一個小巧緊湊的快速的二進位制格式,支援動態 schema 發現(通過它的泛型)和 schema 演變,並且是可壓縮和拆分的。它還提供了複雜的資料結構,例如巢狀型別。

例子

讓我們來看一個例子,建立一個 Avro schema 並生成一些資料。在一個真實案例的例子中,組織機構通常有一些更加普通的格式,例如 XML,的資料,並且他們需要通過一些工具例如 JAXB 將他們的資料轉換成 Avro。我們來使用這個例子,其中 twitter.avsc 如下:

{"type":"record","name":"twitter_schema","namespace":"com.miguno.avro","fields":[{"name":"username","type":"string","doc":"Name of the user account on Twitter.com"
},{"name":"tweet","type":"string","doc":"The content of the user‘s Twitter message"},{"name":"timestamp","type":"long","doc":"Unix epoch time in seconds"}],"doc:":"A basic schema for storing Twitter messages"}

twitter.json 中有一些資料:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine."
,"timestamp":1366150681}{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp":1366154481}

我們將這些資料轉換成二進位制的 Avro 格式:

$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

然後,我們將 Avro 資料轉換為 Java:

$ java -jar /app/avro/avro-tools-1.7.7.jar compile schema /app/avro/data/twitter.avsc /app/avro/data/

現在,我們編譯這些類並將其打包:

$ CLASSPATH=/app/avro/avro-1.7.7-javadoc.jar:/app/avro/avro-mapred-1.7.7-hadoop1.jar:/app/avro/avro-tools-1.7.7.jar
$ javac -classpath $CLASSPATH /app/avro/data/com/miguno/avro/twitter_schema.java
$ jar cvf Twitter.jar com/miguno/avro/*.class

我們啟動 Spark,並將上面建立的 Jar 和一些需要的庫(Hadoop 和 Avro)傳遞給 Spark 程式:

$ ./bin/spark-shell --jars /app/avro/avro-mapred-1.7.7-hadoop1.jar,/avro/avro-1.7.7.jar,/app/avro/data/Twitter.jar

在 REPL 中,我們獲取資料並建立一個 RDD:

scala>import com.miguno.avro.twitter_schema
import org.apache.avro.file.DataFileReader;import org.apache.avro.file.DataFileWriter;import org.apache.avro.io.DatumReader;import org.apache.avro.io.DatumWriter;import org.apache.avro.specific.SpecificDatumReader;import org.apache.avro.mapreduce.AvroKeyInputFormatimport org.apache.avro.mapred.AvroKeyimport