1. 程式人生 > >HDFS支援資料壓縮的幾種方法探討

HDFS支援資料壓縮的幾種方法探討

HDFS支援資料壓縮存在以下幾種方法:

1、在HDFS之上將資料壓縮好後,再儲存到HDFS
2、在HDFS內部支援資料壓縮,這裡又可以分為幾種方法:
    2.1、壓縮工作在DataNode上完成,這裡又分兩種方法:
           2.1.1、資料接收完後,再壓縮
                     這個方法對HDFS的改動最小,但效果最低,只需要在block檔案close後,呼叫壓縮工具,將block檔案壓縮一下,然後再開啟block檔案時解壓一下即可,幾行程式碼就可以搞定
           2.1.2、邊接收資料邊壓縮,使用第三方提供的壓縮庫
                     效率和複雜度折中方法,Hook住系統的write和read操作,在資料寫入磁碟之前,先壓縮一下,但write和read對外的介面行為不變,比如:原始大小為100KB的資料,壓縮後大小為10KB,當寫入100KB後,仍對呼叫者返回100KB,而不是10KB
    2.2、壓縮工作交給DFSClient做,DataNode只接收和儲存
           這個方法效果最高,壓縮分散地推給了HDFS客戶端,但DataNode需要知道什麼時候一個block塊接收完成了。

推薦最終實現採用2.2這個方法,該方法需要修改的HDFS程式碼量也不大,但效果最高。

這裡舉一個例子:
 先說檔案的壓縮有兩大好處:1、可以減少儲存檔案所需要的磁碟空間;2、可以加速資料在網路和磁碟上的傳輸。尤其是在處理大資料時,這兩大好處是相當重要的。

  下面是一個使用gzip工具壓縮檔案的例子。將檔案/user/hadoop/aa.txt進行壓縮,壓縮後為/user/hadoop/text.gz

  1. package com.hdfs;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import java.io.OutputStream;
  5. import java.net.URI;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FSDataInputStream;
  8. import org.apache.hadoop.fs.FSDataOutputStream;
  9. import org.apache.hadoop.fs.FileSystem;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.io.IOUtils;
  12. import org.apache.hadoop.io.compress.CompressionCodec;
  13. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  14. import org.apache.hadoop.io.compress.CompressionInputStream;
  15. import org.apache.hadoop.io.compress.CompressionOutputStream;
  16. import org.apache.hadoop.util.ReflectionUtils;
  17. public class CodecTest {
  18.     //壓縮檔案
  19.     public static void compress(String codecClassName) throws Exception{
  20.         Class<?> codecClass = Class.forName(codecClassName);
  21.         Configuration conf = new Configuration();
  22.         FileSystem fs = FileSystem.get(conf);
  23.         CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
  24.         //指定壓縮檔案路徑
  25.         FSDataOutputStream outputStream = fs.create(new Path("/user/hadoop/text.gz"));
  26.         //指定要被壓縮的檔案路徑
  27.         FSDataInputStream in = fs.open(new Path("/user/hadoop/aa.txt"));
  28.         //建立壓縮輸出流
  29.         CompressionOutputStream out = codec.createOutputStream(outputStream);  
  30.         IOUtils.copyBytes(in, out, conf);
  31.         IOUtils.closeStream(in);
  32.         IOUtils.closeStream(out);
  33.     }
  34.     //解壓縮
  35.     public static void uncompress(String fileName) throws Exception{
  36.         Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec");
  37.         Configuration conf = new Configuration();
  38.         FileSystem fs = FileSystem.get(conf);
  39.         CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
  40.         FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/text.gz"));
  41.          //把text檔案裡到資料解壓,然後輸出到控制檯  
  42.         InputStream in = codec.createInputStream(inputStream);  
  43.         IOUtils.copyBytes(in, System.out, conf);
  44.         IOUtils.closeStream(in);
  45.     }
  46.     //使用副檔名來推斷二來的codec來對檔案進行解壓縮
  47.     public static void uncompress1(String uri) throws IOException{
  48.         Configuration conf = new Configuration();
  49.         FileSystem fs = FileSystem.get(URI.create(uri), conf);
  50.         Path inputPath = new Path(uri);
  51.         CompressionCodecFactory factory = new CompressionCodecFactory(conf);
  52.         CompressionCodec codec = factory.getCodec(inputPath);
  53.         if(codec == null){
  54.             System.out.println("no codec found for " + uri);
  55.             System.exit(1);
  56.         }
  57.         String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
  58.         InputStream in = null;
  59.         OutputStream out = null;
  60.         try {
  61.             in = codec.createInputStream(fs.open(inputPath));
  62.             out = fs.create(new Path(outputUri));
  63.             IOUtils.copyBytes(in, out, conf);
  64.         } finally{
  65.             IOUtils.closeStream(out);
  66.             IOUtils.closeStream(in);
  67.         }
  68.     }
  69.     public static void main(String[] args) throws Exception {
  70.         //compress("org.apache.hadoop.io.compress.GzipCodec");
  71.         //uncompress("text");
  72.         uncompress1("hdfs://master:9000/user/hadoop/text.gz");
  73.     }
  74. }
複製程式碼 首先執行77行進行壓縮,壓縮後執行第78行進行解壓縮,這裡解壓到標準輸出,所以執行78行會再控制檯看到檔案/user/hadoop/aa.txt的內容。如果執行79行的話會將檔案解壓到/user/hadoop/text,他是根據/user/hadoop/text.gz的副檔名判斷使用哪個解壓工具進行解壓的。解壓後的路徑就是去掉副檔名。

  進行檔案壓縮後,在執行命令./hadoop fs -ls /user/hadoop/檢視檔案資訊,如下:
  1. [hadoop@master bin]$ ./hadoop fs -ls /user/hadoop/
  2. Found 7 items
  3. -rw-r--r--   3 hadoop supergroup   76805248 2013-06-17 23:55 /user/hadoop/aa.mp4
  4. -rw-r--r--   3 hadoop supergroup        520 2013-06-17 22:29 /user/hadoop/aa.txt
  5. drwxr-xr-x   - hadoop supergroup          0 2013-06-16 17:19 /user/hadoop/input
  6. drwxr-xr-x   - hadoop supergroup          0 2013-06-16 19:32 /user/hadoop/output
  7. drwxr-xr-x   - hadoop supergroup          0 2013-06-18 17:08 /user/hadoop/test
  8. drwxr-xr-x   - hadoop supergroup          0 2013-06-18 19:45 /user/hadoop/test1
  9. -rw-r--r--   3 hadoop supergroup         46 2013-06-19 20:09 /user/hadoop/text.gz