Parquet學習筆記
Apache Parquet是Hadoop生態圈中一種新型列式儲存格式,它可以相容Hadoop生態圈中大多數計算框架(Hadoop、Spark等),被多種查詢引擎支援(Hive、Impala、Drill等),並且它是語言和平臺無關的。
Definition level & Repetition level
定義
definition Level
Definition level指明該列的路徑上多少個可選field被定義了。
definition Level是該路徑上有定義的repeated field 和 optional field的個數,不包括required field,因為required field是必須有定義的。
Repetition levels
Repetition level指明該值在路徑中哪個repeated field重複。
DL和RL的計算
我們用深度0表示一個紀錄的開頭(虛擬的根節點),深度的計算忽略非重複欄位(標籤不是repeated的欄位都不算在深度裡)。所以在Name.Language.Code這個路徑中,包含兩個重複欄位,Name和Language,如果在Name處重複,重複深度為1(虛擬的根節點是0,下一級就是1),在Language處重複就是2,不可能在Code處重複,它是required型別,表示有且僅有一個;同樣的,在路徑Links.Forward中,Links是optional的,不參與深度計算(不可能重複),Forward是repeated的,因此只有在Forward處重複時重複深度為1。
Parquet Java example
pom檔案
<dependencies> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>0.23.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> </dependencies>
WriteParquet.java
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; public class WriteParquet { public static void main(String[] args) throws IllegalArgumentException, IOException { List<Field> fields = new ArrayList<Field>(); Object defaultValue = null; fields.add(new Field("x", Schema.create(Type.INT), "x", defaultValue)); fields.add(new Field("y", Schema.create(Type.INT), "y", defaultValue)); Schema schema = Schema.createRecord("name", "doc", "namespace", false, fields); try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder( new Path("my-file.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY) .build()) { // 模擬10000行資料 for (int r = 0; r < 10000; ++r) { Record record = new Record(schema); record.put(0, r); record.put(1, r * 3); writer.write(record); } } } }
ReadParquet.java
import java.io.IOException; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; public class ReadParquet { public static void main(String[] args) throws IllegalArgumentException, IOException { ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("my-file.parquet")) .build(); GenericRecord record; while ((record = reader.read()) != null) { System.out.println(record); } } }