1. 程式人生 > >Hbase通過BulkLoad的方式快速匯入海量資料

Hbase通過BulkLoad的方式快速匯入海量資料

摘要

載入資料到HBase的方式有多種,通過HBase API匯入或命令列匯入或使用第三方(如sqoop)來匯入或使用MR來批量匯入(耗費磁碟I/O,容易在匯入的過程使節點宕機),但是這些方式不是慢就是在匯入的過程的佔用Region資料導致效率低下,今天要講的就是利用HBase在HDFS儲存原理及MapReduce的特性來快速匯入海量的資料 HBase資料在HDFS下是如何儲存的? HBase中每張Table在根目錄(/HBase)下用一個資料夾儲存,Table名為資料夾名,在Table資料夾下每個Region同樣用一個資料夾儲存,每個Region資料夾下的每個列族也用資料夾儲存,而每個列族下儲存的就是一些HFile檔案,HFile就是HBase資料在HFDS下儲存格式,其整體目錄結構如下: /hbase/<tablename>/<encoded-regionname>/<column-family>/<filename> HBase資料寫路徑
                                                                              (圖來自Cloudera) 在put資料時會先將資料的更新操作資訊和資料資訊寫入WAL,在寫入到WAL後,資料就會被放到MemStore中,當MemStore滿後資料就會被flush到磁碟(即形成HFile檔案),在這過程涉及到的flush,split,compaction等操作都容易造成節點不穩定,資料匯入慢,耗費資源等問題,在海量資料的匯入過程極大的消耗了系統性能,避免這些問題最好的方法就是使用BlukLoad的方式來載入資料到HBase中。
原理 利用HBase資料按照HFile格式儲存在HDFS的原理,使用Mapreduce直接生成HFile格式檔案後,RegionServers再將HFile檔案移動到相應的Region目錄下 其流程如下圖:                                                                       (圖來自Cloudera) 匯入過程 1.使用MapReduce生成HFile檔案 GenerateHFile類
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class GenerateHFile extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split("\t"); String ROWKEY = items[1] + items[2] + items[3]; ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes()); Put put = new Put(ROWKEY.getBytes());   //ROWKEY put.addColumn("INFO".getBytes(), "URL".getBytes(), items[0].getBytes()); put.addColumn("INFO".getBytes(), "SP".getBytes(), items[1].getBytes());  //出發點 put.addColumn("INFO".getBytes(), "EP".getBytes(), items[2].getBytes());  //目的地 put.addColumn("INFO".getBytes(), "ST".getBytes(), items[3].getBytes());   //出發時間 put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items[4])));  //價格 put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items[5].getBytes());//交通方式 put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items[6].getBytes()); //酒店 context.write(rowkey, put); } }
GenerateHFileMain類 複製程式碼
public class GenerateHFileMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final String INPUT_PATH= "hdfs://master:9000/INFO/Input";
        final String OUTPUT_PATH= "hdfs://master:9000/HFILE/Output";
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("TRAVEL"));
        Job job=Job.getInstance(conf);
        job.getConfiguration().set("mapred.jar", "/home/hadoop/TravelProject/out/artifacts/Travel/Travel.jar");  //預先將程式打包再將jar分發到叢集上
        job.setJarByClass(GenerateHFileMain.class);
        job.setMapperClass(GenerateHFile.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("TRAVEL")))
        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true)?0:1);
    }
複製程式碼 注意 1.Mapper的輸出Key型別必須是包含Rowkey的ImmutableBytesWritable格式,Value型別必須為KeyValue或Put型別,當匯入的資料有多列時使用Put,只有一個列時使用KeyValue 2.job.setMapOutPutValueClass的值決定了job.setReduceClass的值,這裡Reduce主要起到了對資料進行排序的作用,當job.setMapOutPutValueClass的值Put.class和KeyValue.class分別對應job.setReduceClass的PutSortReducer和KeyValueSortReducer 3.在建立表時對錶進行預分割槽再結合MapReduce的平行計算機制能加快HFile檔案的生成,如果對錶進行了預分割槽(Region)就設定Reduce數等於分割槽數(Region) 4.在多列族的情況下需要進行多次的context.write 2.通過BlukLoad方式載入HFile檔案 複製程式碼
public class LoadIncrementalHFileToHBase {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        Table table = connection.getTable(TableName.valueOf("TRAVEL"));
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
        load.doBulkLoad(new Path("hdfs://master:9000/HFILE/OutPut"), admin,table,connection.getRegionLocator(TableName.valueOf("TRAVEL")));
    }
}
複製程式碼 由於BulkLoad是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些複製資料的操作 優點: 1.匯入過程不佔用Region資源 2.能快速匯入海量的資料 3.節省記憶體