1. 程式人生 > >10天Hadoop快速突擊(5)——Hadoop I/O操作

10天Hadoop快速突擊(5)——Hadoop I/O操作

Hadoop IO操作

意義

Hadoop自帶一套用於I/O的原子性的操作
(不會被執行緒排程機制打斷,一直到結束,中間不會有任何context switch)
特點
基於保障海量資料集的完整性和壓縮性 
Hadoop提供了一些用於開發分散式系統的API(一些序列化操作+基於磁碟的底層資料結構)

一、資料完整性

hdfs寫入的時候計算出校驗和,然後每次讀的時候再計算校驗和。要注意的一點是,hdfs每固定長度就會計算一次校驗和,這個值由io.bytes.per.checksum指定,預設是512位元組。因為CRC32是32位即4個位元組,這樣校驗和佔用的空間就會少於原資料的1%。1%這個數字在hadoop中會經常看到。以後有時間會整理一份hadoop和1%不得不說的故事。

datanode在儲存收到的資料前會校驗資料的校驗和,比如收到客戶端的資料或者其他副本傳過來的資料。hdfs資料流中客戶端寫入資料到hdfs時的資料流,在管道的最後一個datanode會去檢查這個校驗和,如果發現錯誤,就會丟擲ChecksumException到客戶端。客戶端從datanode讀資料的時候一樣要檢查校驗和,而且每個datanode還儲存了檢查校驗和的日誌,客戶端的每一次校驗都會記錄到日誌中。除了讀寫操作會檢查校驗和以外,datanode還跑著一個後臺程序(DataBlockScanner)來定期校驗存在在它上面的block,因為除了讀寫過程中會產生資料錯誤以外,硬體本身也會產生資料錯誤,比如說位衰減(bit rot)。
如果客戶端發現有block壞掉呢,會怎麼恢復這個壞的塊,主要分幾步:1.客戶端在丟擲ChecksumException之前會把壞的block和block所在的datanode報告給namenode2.namenode把這個block標記為已損壞,這樣namenode就不會把客戶端指向這個block,也不會複製這個block到其他的datanode。3.namenode會把一個好的block複製到另外一個datanode4.namenode把壞的block刪除掉如果出於一些原因在操作的時候不想讓hdfs檢查校驗碼,在呼叫FileSystem的open方法前呼叫setVerityCheckSum方法,並設為為false即可,命令列下可以使用-ignoreCrc引數。

實現

LocalFileSystem繼承自ChecksumFileSystem,已經實現了checksum功能,checksum的資訊儲存在與檔名同名的crc檔案中,發現錯誤的檔案放在bad_files資料夾中。如果你確認頂層系統已經實現了checksum功能,那麼你就沒必要使用LocalFileSystem,改為用RowLocalFileSystem。可以通過更改fs.file.impl=org.apache.hadoop.fs.RawLoacalFileSystem全域性指定,也可以通過程式碼直接例項化
  1. Configuration conf=...  
  2.        FileSystem fs=new RawLocalFileSystem();  
  3.        fs.initialize(null, conf);  
如果其他的FileSystem想擁有checksum功能的話,只需要用ChecksumFileSystem包裝一層即可:
  1. FileSystem rawFs=...  
  2.         FileSystem checksummedFs=new ChecksumFileSystem(fs){} ;  

二、檔案格式

Hadoop中的檔案格式大致上分為面向行和麵向列兩類:

  • 面向行:同一行的資料儲存在一起,即連續儲存。SequenceFile,MapFile,Avro Datafile。採用這種方式,如果只需要訪問行的一小部分資料,亦需要將整行讀入記憶體,推遲序列化一定程度上可以緩解這個問題,但是從磁碟讀取整行資料的開銷卻無法避免。面向行的儲存適合於整行資料需要同時處理的情況。

  • 面向列:整個檔案被切割為若干列資料,每一列資料一起儲存。Parquet , RCFile,ORCFile。面向列的格式使得讀取資料時,可以跳過不需要的列,適合於只處於行的一小部分欄位的情況。但是這種格式的讀寫需要更多的記憶體空間,因為需要快取行在記憶體中(為了獲取多行中的某一列)。同時不適合流式寫入,因為一旦寫入失敗,當前檔案無法恢復,而面向行的資料在寫入失敗時可以重新同步到最後一個同步點,所以Flume採用的是面向行的儲存格式。

1. SequenceFile

SequenceFile的檔案結構如下:

Sequence.png-272.5kB

根據是否壓縮,以及採用記錄壓縮還是塊壓縮,儲存格式有所不同:

  • 不壓縮: 
    按照記錄長度、Key長度、Value程度、Key值、Value值依次儲存。長度是指位元組數。採用指定的Serialization進行序列化。

  • Record壓縮: 
    只有value被壓縮,壓縮的codec儲存在Header中。

  • Block壓縮: 
    多條記錄被壓縮在一起,可以利用記錄之間的相似性,更節省空間。Block前後都加入了同步標識。Block的最小值由io.seqfile.compress.blocksize屬性設定。 
    block-compression.png-217.8kB

2. MapFile

MapFile是SequenceFile的變種,在SequenceFile中加入索引並排序後就是MapFile。索引作為一個單獨的檔案儲存,一般每個128個記錄儲存一個索引。索引可以被載入記憶體,用於快速查詢。存放資料的檔案根據Key定義的順序排列。 
MapFile的記錄必須按照順序寫入,否則丟擲IOException。

MapFile的衍生型別:

  • SetFile:特殊的MapFile,用於儲存一序列Writable型別的Key。Key按照順序寫入。
  • ArrayFile:Key為整數,代表在陣列中的位置,value為Writable型別。
  • BloomMapFile:針對MapFile的get()方法,使用動態Bloom過濾器進行優化。過濾器儲存在記憶體中,只有帶key值存在的時候,才會呼叫常規的get()方法,真正進行讀操作。

Hadoop體系下面向列的檔案包括RCFile,ORCFile,Parquet的。Avro的面向列版本為Trevni。

三、壓縮

hadoop對於壓縮格式的是透明識別,我們的MapReduce任務的執行是透明的,hadoop能夠自動為我們 將壓縮的檔案解壓,而不用我們去關心。

如果我們壓縮的檔案有相應壓縮格式的副檔名(比如lzo,gz,bzip2等),hadoop就會根據副檔名去選擇解碼器解壓。hadoop對每個壓縮格式的支援,詳細見下表:
壓縮格式工具演算法副檔名多檔案可分割性
DEFLATEDEFLATE.deflate
gzipgzipDEFLATE.gz
ZIPzipDEFLATE.zip是,在檔案範圍內
bzip2bzip2bzip2.bz2
LZOlzopLZO.lzo
如果壓縮的檔案沒有副檔名,則需 要在執行mapreduce任務的時候指定輸入格式.hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper /usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 -jobconf mapred.reduce.tasks=1 *-inputformat org.apache.hadoop.mapred.LzoTextInputFormat* 
hadoop下各種壓縮演算法的壓縮比,壓縮時間,解壓時間見下表:
壓縮演算法原始檔案大小壓縮後的檔案大小壓縮速度解壓縮速度
gzip8.3GB1.8GB17.5MB/s58MB/s
bzip28.3GB1.1GB2.4MB/s9.5MB/s
LZO-bset8.3GB2GB4MB/s60.6MB/s
LZO8.3GB2.9GB49.3MB/S74.6MB/s

四、序列化

1、什麼是序列化? 

將結構化物件轉換成位元組流以便於進行網路傳輸或寫入持久儲存的過程。 
2、什麼是反序列化? 
將位元組流轉換為一系列結構化物件的過程。

序列化用途

1、作為一種持久化格式。 
2、作為一種通訊的資料格式。 
3、作為一種資料拷貝、克隆機制。

Java序列化和反序列化

1、建立一個物件實現了Serializable 
2、序列化:ObjectOutputStream.writeObject(序列化物件) 
反序列化:ObjectInputStream.readObject()返回序列化物件 
具體實現,可參考如下文章: 
http://blog.csdn.net/scgaliguodong123_/article/details/45938555

為什麼Hadoop不直接使用java序列化?

Hadoop的序列化機制與Java的序列化機制不同,它將物件序列化到流中,值得一提的是java的序列化機制是不斷的建立物件,但在hadoop的序列化機制中,使用者可以複用物件,這樣就減少了java物件的分配和回收,提高了應用效率。

Hadoop序列化

Hadoop的序列化不採用java的序列化,而是實現了自己的序列化機制。 
Hadoop通過Writable介面實現的序列化機制,不過沒有提供比較功能,所以和java中的Comparable介面合併,提供一個介面WritableComparable。(自定義比較)

Writable介面提供兩個方法(write和readFields)。

  1. package org.apache.hadoop.io;
  2. public interface Writable {
  3. void write(DataOutput out) throws IOException;
  4. void readFields(DataInput in) throws IOException;
  5. }
  • 需要進行比較的話,要實現WritableComparable介面。
  1. public interfaceWritableComparable<T> extendsWritable, Comparable<T>{
  2. }

Hadoop提供了幾個重要的序列化介面與實現類:

外部集合的比較器

RawComparator<T>、WritableComparator

  1. package org.apache.hadoop.io;
  2. public interfaceRawComparator<T> extendsComparator<T> {
  3. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
  4. }
  5. public classWritableComparatorimplementsRawComparator{
  6. private final Class<? extendsWritableComparable> keyClass;
  7. privatefinalWritableComparablekey1;
  8. privatefinalWritableComparablekey2;
  9. }

實現了WritableComparable介面的類(自定義比較)

  1. org.apache.hadoop.io
  2. 介面
  3. WritableComparable<T>
  4. 父介面
  5. Comparable<T>, Writable
  6. 基礎實現類
  7. BooleanWritable, ByteWritable, ShortWritable,IntWritable,
  8. VIntWritable,LongWritable, VLongWritable , FloatWritable, DoubleWritable
  9. 高階實現類
  10. MD5Hash, NullWritable,Text, BytesWritable,ObjectWritable,GenericWritab

僅實現了Writable介面的類

  1. org.apache.hadoop.io
  2. Interface(介面) Writable
  3. All Known Subinterfaces(子介面):
  4. Counter, CounterGroup, CounterGroupBase<T>, InputSplit, InputSplitWithLocationInfo, WritableComparable<T>
  5. 僅實現了Writable介面的類
  6. 陣列:AbstractWritableTwoDArrayWritable
  7. 對映:AbstractMapWritableMapWritableSortedMapWritabl

Writable介面

 
Text 
Text是UTF-8的Writable,可以理解為java.lang.String相類似的Writable。Text類替代了UTF-8類。Text是可變的,其值可以通過呼叫set()方法改變。最大可以儲存2GB的大小。

NullWritable 
NullWritable是一種特殊的Writable型別,它的序列化長度為零,可以用作佔位符。

BytesWritable 
BytesWritable是一個二進位制資料陣列封裝,序列化格式是一個int欄位。 
例如:一個長度為2,值為3和5的位元組陣列序列後的結果是:

  1. @Test
  2. publicvoidtestByteWritableSerilizedFromat() throws IOException {
  3. BytesWritable bytesWritable=new BytesWritable(new byte[]{3,5});
  4. byte[] bytes=SerializeUtils.serialize(bytesWritable);
  5. Assert.assertEquals(StringUtils.byteToHexString(bytes),"000000020305"); //true
  6. }

BytesWritable是可變的,其值可以通過呼叫set()方法來改變。

ObjectWritable 
ObjectWritable適用於欄位使用多種型別時。

Writable集合 
1、ArrayWritableTwoDArrayWritable是針對陣列和二維陣列。 
2、MapWritableSortedMapWritable是針對Map和SortMap。

自定義Writable

1、實現WritableComparable介面 
2、實現相應的介面方法: 
A.write() //將物件轉換為位元組流並寫入到輸出流out中。 
B.readFileds() //從輸入流in中讀取位元組流併發序列化為物件。 
C.compareTo(o) //將this物件和物件o進行比較。 
可參考下面的例子,自定義NewK2類: 
http://blog.csdn.net/scgaliguodong123_/article/details/46010947

  1. package Writable;
  2. import java.io.BufferedInputStream;
  3. import java.io.BufferedOutputStream;
  4. import java.io.DataInput;
  5. import java.io.DataInputStream;
  6. import java.io.DataOutput;
  7. import java.io.DataOutputStream;
  8. import java.io.File;
  9. import java.io.FileInputStream;
  10. import java.io.FileNotFoundException;
  11. import java.io.FileOutputStream;
  12. import java.io.IOException;
  13. import org.apache.hadoop.io.IntWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.io.WritableComparable;
  16. public classDefineWritable{
  17. publicstaticvoidmain(String[] args) throws IOException {
  18. Student student = new Student("liguodong", 22, "男");
  19. BufferedOutputStream bos = new BufferedOutputStream(
  20. new FileOutputStream(new File("g:/liguodong.txt")));
  21. DataOutputStream dos = new DataOutputStream(bos);
  22. student.write(dos);
  23. dos.flush();
  24. dos.close();
  25. bos.close();
  26. Student student2 = new Student();
  27. BufferedInputStream bis = new BufferedInputStream(
  28. new FileInputStream(new File("g:/liguodong.txt")));
  29. DataInputStream dis = new DataInputStream(bis);
  30. student2.readFields(dis);
  31. System.out.println("name="+student2.getName()
  32. +",age="+student2.getAge()+",sex="+student2.getSex());
  33. }
  34. }
  35. class Student implements WritableComparable<Student>{
  36. private Text name = new Text();
  37. private IntWritable age = new IntWritable();
  38. private Text sex = new Text();
  39. publicStudent() {
  40. }
  41. publicStudent(String name, int age, String sex) {
  42. super();
  43. this.name = new Text(name);
  44. this.age = new IntWritable(age);
  45. this.sex = new Text(sex);
  46. }
  47. public Text getName() {
  48. return name;
  49. }
  50. publicvoidsetName(Text name) {
  51. this.name = name;
  52. }
  53. public IntWritable getAge