1. 程式人生 > >hbase海量資料的全量匯入方法

hbase海量資料的全量匯入方法

最近有個需求要對mysql的全量資料遷移到hbase,雖然hbase的設計非常利於高效的讀取,但是它的compaction實現對海量資料寫入造成非常大的影響,資料到一定量之後,就開始抽風。
分析hbase的實現,不管其執行的機制,其最終儲存結構為分散式檔案系統中的hfile格式。
剛好hbase的原始碼中提供一個HFileOutputFormat類,分析其原始碼可以看到:
/**
 * Copyright 2009 The Apache Software Foundation
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.mortbay.log.Log;

/**
 * Writes HFiles. Passed KeyValues must arrive in order.
 * Currently, can only write files to a single column family at a
 * time.  Multiple column families requires coordinating keys cross family.
 * Writes current time as the sequence id for the file. Sets the major compacted
 * attribute on created hfiles.
 * @see KeyValueSortReducer
 */
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)
  throws IOException, InterruptedException {
    // Get the path of the temporary output file 
    final Path outputPath = FileOutputFormat.getOutputPath(context);
    final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
    Configuration conf = context.getConfiguration();
    final FileSystem fs = outputdir.getFileSystem(conf);
    // These configs. are from hbase-*.xml
    final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
    final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536);
    // Invented config.  Add to hbase-*.xml if other than default compression.
    final String compression = conf.get("hfile.compression",
      Compression.Algorithm.NONE.getName());

    return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
      // Map of families to writers and how much has been output on the writer.
      private final Map<byte [], WriterLength> writers =
        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());

      public void write(ImmutableBytesWritable row, KeyValue kv)
      throws IOException {
        long length = kv.getLength();
        byte [] family = kv.getFamily();
        WriterLength wl = this.writers.get(family);
        if (wl == null || ((length + wl.written) >= maxsize) &&
            Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
              kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
          // Get a new writer.
          Path basedir = new Path(outputdir, Bytes.toString(family));
          if (wl == null) {
            wl = new WriterLength();
            this.writers.put(family, wl);
            if (this.writers.size() > 1) throw new IOException("One family only");
            // If wl == null, first file in family.  Ensure family dir exits.
            if (!fs.exists(basedir)) fs.mkdirs(basedir);
          }
          wl.writer = getNewWriter(wl.writer, basedir);
          Log.info("Writer=" + wl.writer.getPath() +
            ((wl.written == 0)? "": ", wrote=" + wl.written));
          wl.written = 0;
        }
        kv.updateLatestStamp(this.now);
        wl.writer.append(kv);
        wl.written += length;
        // Copy the row so we know when a row transition.
        this.previousRow = kv.getRow();
      }

      /* Create a new HFile.Writer. Close current if there is one.
       * @param writer
       * @param familydir
       * @return A new HFile.Writer.
       * @throws IOException
       */
      private HFile.Writer getNewWriter(final HFile.Writer writer,
          final Path familydir)
      throws IOException {
        close(writer);
        return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),
          blocksize, compression, KeyValue.KEY_COMPARATOR);
      }

      private void close(final HFile.Writer w) throws IOException {
        if (w != null) {
          StoreFile.appendMetadata(w, System.currentTimeMillis(), true);
          w.close();
        }
      }

      public void close(TaskAttemptContext c)
      throws IOException, InterruptedException {
        for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
          close(e.getValue().writer);
        }
      }
    };
  }

  /*
   * Data structure to hold a Writer and amount of data written on it. 
   */
  static class WriterLength {
    long written = 0;
    HFile.Writer writer = null;
  }
}


可以看到,它的工作流程就是首先根據你的配置檔案初始化,然後寫成hfile的格式。
這裡我做了個偷懶的demo:
HFileOutputFormat hf = new HFileOutputFormat();
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/home/performance/softs/hadoop/conf/core-site.xml"));
        conf.set("mapred.output.dir", "/tmp");
        conf.set("hfile.compression", Compression.Algorithm.LZO.getName());
        TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
        RecordWriter writer = hf.getRecordWriter(context);
        KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:action"),
                                   System.currentTimeMillis(), Bytes.toBytes("test"));
        KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:id"),
                                    System.currentTimeMillis(), Bytes.toBytes("123"));
        KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:action"),
                                    System.currentTimeMillis(), Bytes.toBytes("test"));
        KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:id"),
                                    System.currentTimeMillis(), Bytes.toBytes("123"));
        writer.write(null, kv);
        writer.write(null, kv1);
        writer.write(null, kv3);
        writer.write(null, kv4);
        writer.close(context);

執行然之後,會在hdfs的/tmp目錄下生成一份檔案。注意批量寫資料的時候一定要保證key的有序性
這個時候,hbase自己提供的一個基於jruby的loadtable.rb指令碼就可以發揮作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路徑:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
執行完之後:
執行./hbase shell
>list
就會顯示剛才匯入的offer表了。