1. 程式人生 > >Hadoop權威指南---I/O操作

Hadoop權威指南---I/O操作

目錄

Hadoop的I/O操作 

1、資料完整性 

資料在進過網路io傳輸或者磁碟io時有可能會損壞,因此一般通過計算校驗和來確定傳輸資料是否被損壞。校驗和checksum也有可能會損壞,但是因為其數量很小,出現損壞的機率很小。

1.1、HDFS的資料完整性 

簡單來說,在客戶端往datanode寫資料的時候也會發生校驗和,和datanode節點接收到的資料計算得到的校驗和對比,如果不一致會報錯給客戶端,如果一致,在datanode節點每個資料塊下也會儲存一個數據塊的校驗和檔案;當客戶端讀取資料塊時也會讀取校驗和檔案,然後和自己通過資料塊計算的校驗和對比,看看資料塊是否被損壞。並且持久化DataNode節點的校驗和檔案會實時更新最後客戶端的訪問時間,好用於統計該檔案塊什麼時候開始損壞的。

1.2、 LocalFileSystem 和 ChecksumFileSystem 

ChecksumFileSystem 繼承自FileSystem

Path getChecksumFile(Path file)方法可以獲取任意一個檔案的校驗和路徑
public abstract class ChecksumFileSystem extends FilterFileSystem {}
public class FilterFileSystem extends FileSystem {}

2、 壓縮 CompressionCodec

壓縮檔案的兩大好處:

  • 減少儲存檔案所需要的磁碟空間;
  • 加速資料在網路和磁碟上的傳輸;

2.1、 codec:Hadoop中對常用壓縮解壓縮演算法的實現

1)、通過CompressionCodec 對資料流進行壓縮和解壓縮

簡單來說就是需要對輸出流進行壓縮的時候呼叫createOutputStream來包裝一個輸出流,解壓縮的時候呼叫createInputStream來包裝一個輸入流

public interface CompressionCodec {
    CompressionOutputStream createOutputStream(OutputStream var1) throws IOException;

    CompressionOutputStream createOutputStream(OutputStream var1, Compressor var2) throws IOException;

    Class<? extends Compressor> getCompressorType();

    Compressor createCompressor();

    CompressionInputStream createInputStream(InputStream var1) throws IOException;

    CompressionInputStream createInputStream(InputStream var1, Decompressor var2) throws IOException;

    Class<? extends Decompressor> getDecompressorType();

    Decompressor createDecompressor();

    String getDefaultExtension();

    public static class Util {
        public Util() {
        }

        static CompressionOutputStream createOutputStreamWithCodecPool(CompressionCodec codec, Configuration conf, OutputStream out) throws IOException {
            Compressor compressor = CodecPool.getCompressor(codec, conf);
            CompressionOutputStream stream = null;

            try {
                stream = codec.createOutputStream(out, compressor);
            } finally {
                if (stream == null) {
                    CodecPool.returnCompressor(compressor);
                } else {
                    stream.setTrackedCompressor(compressor);
                }

            }

            return stream;
        }

        static CompressionInputStream createInputStreamWithCodecPool(CompressionCodec codec, Configuration conf, InputStream in) throws IOException {
            Decompressor decompressor = CodecPool.getDecompressor(codec);
            CompressionInputStream stream = null;

            try {
                stream = codec.createInputStream(in, decompressor);
            } finally {
                if (stream == null) {
                    CodecPool.returnDecompressor(decompressor);
                } else {
                    stream.setTrackedDecompressor(decompressor);
                }

            }

            return stream;
        }
    }
}
public abstract class CompressionOutputStream extends OutputStream {
    protected final OutputStream out;
    private Compressor trackedCompressor;
。。。
}
public abstract class CompressionInputStream extends InputStream implements Seekable {
    protected final InputStream in;
    protected long maxAvailableData = 0L;
    private Decompressor trackedDecompressor;
。。。
}

2)、通過CompressionCodecFactory的getCodec方法推斷檔案使用的壓縮型別類CompressionCodec

其中支援通過檔案的副檔名來判斷使用的壓縮演算法類,通過輸入檔案的路徑path

 3)、壓縮解壓縮的原生類庫

 4)、CodecPool是一個靜態工廠類

類似於資料庫連線池、執行緒池的概念,主要是重用Compressor和Decompressor物件

2.2、 壓縮和輸入分片

支援切分的壓縮演算法bzip2

2.3、 在MapReduce中使用壓縮

3、 序列化 Writable

3.1 Writable介面 

public interface Writable {
    void write(DataOutput var1) throws IOException;

    void readFields(DataInput var1) throws IOException;
}

其中:

  • write方式是執行序列化操作的,把writeable物件自己寫入到輸出流中,然後轉化為位元組陣列;
  • readFields是執行反序列化的,把位元組陣列封裝到輸入流中,然後根據輸入流來構建writeable物件;

Writable介面的序列化 :把一個Writable物件序列化為位元組陣列

Writable介面的反序列化 :把位元組陣列中的資料反序列化為一個Writable物件

WritableComparable和comparator

public interface WritableComparable<T> extends Writable, Comparable<T> {
}

public class WritableComparator implements RawComparator, Configurable {}

3.2 Writable類

1)、Writable的基本型別封裝

2)、text型別(其也實現了序列化介面Writable)

對比Text和IntWritable型別

public class Text extends BinaryComparable implements WritableComparable<BinaryComparable> {}
public class IntWritable implements WritableComparable<IntWritable> {}

text是針對UTF-8序列的Writable類,一般可以認為是java的String的Writable等價(一般作為MapReduce的Key)

3)、BytesWritable

4)、NullWritable

 

5)、ObjectWritable和GenericWritable

6)、Writable的集合類

3.3 實現定製的Writable集合

3.4 序列化框架

 

public class WritableSerialization extends Configured implements Serialization<Writable> {}
public interface Serialization<T> {
    boolean accept(Class<?> var1);

    Serializer<T> getSerializer(Class<T> var1);

    Deserializer<T> getDeserializer(Class<T> var1);
}

4、 基於檔案的資料結構 

4.1 關於SequenceFile

順序檔案:以key:value形式的文字檔案???

4.2 關於MapFile

4.3 其他檔案格式和麵向列的格式

參考:

Hadoop權威指南.大資料的儲存與分析.第4版---第5章 Hadoop的I/O操作