hbase海量資料的全量匯入方法
阿新 • • 發佈:2019-01-24
最近有個需求要對mysql的全量資料遷移到hbase,雖然hbase的設計非常利於高效的讀取,但是它的compaction實現對海量資料寫入造成非常大的影響,資料到一定量之後,就開始抽風。
分析hbase的實現,不管其執行的機制,其最終儲存結構為分散式檔案系統中的hfile格式。
剛好hbase的原始碼中提供一個HFileOutputFormat類,分析其原始碼可以看到:
可以看到,它的工作流程就是首先根據你的配置檔案初始化,然後寫成hfile的格式。
這裡我做了個偷懶的demo:
執行然之後,會在hdfs的/tmp目錄下生成一份檔案。注意批量寫資料的時候一定要保證key的有序性
這個時候,hbase自己提供的一個基於jruby的loadtable.rb指令碼就可以發揮作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路徑:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
執行完之後:
執行./hbase shell
>list
就會顯示剛才匯入的offer表了。
分析hbase的實現,不管其執行的機制,其最終儲存結構為分散式檔案系統中的hfile格式。
剛好hbase的原始碼中提供一個HFileOutputFormat類,分析其原始碼可以看到:
/** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.mortbay.log.Log; /** * Writes HFiles. Passed KeyValues must arrive in order. * Currently, can only write files to a single column family at a * time. Multiple column families requires coordinating keys cross family. * Writes current time as the sequence id for the file. Sets the major compacted * attribute on created hfiles. * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> { public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456); final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536); // Invented config. Add to hbase-*.xml if other than default compression. final String compression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); return new RecordWriter<ImmutableBytesWritable, KeyValue>() { // Map of families to writers and how much has been output on the writer. private final Map<byte [], WriterLength> writers = new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); public void write(ImmutableBytesWritable row, KeyValue kv) throws IOException { long length = kv.getLength(); byte [] family = kv.getFamily(); WriterLength wl = this.writers.get(family); if (wl == null || ((length + wl.written) >= maxsize) && Bytes.compareTo(this.previousRow, 0, this.previousRow.length, kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) { // Get a new writer. Path basedir = new Path(outputdir, Bytes.toString(family)); if (wl == null) { wl = new WriterLength(); this.writers.put(family, wl); if (this.writers.size() > 1) throw new IOException("One family only"); // If wl == null, first file in family. Ensure family dir exits. if (!fs.exists(basedir)) fs.mkdirs(basedir); } wl.writer = getNewWriter(wl.writer, basedir); Log.info("Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); wl.written = 0; } kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; // Copy the row so we know when a row transition. this.previousRow = kv.getRow(); } /* Create a new HFile.Writer. Close current if there is one. * @param writer * @param familydir * @return A new HFile.Writer. * @throws IOException */ private HFile.Writer getNewWriter(final HFile.Writer writer, final Path familydir) throws IOException { close(writer); return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), blocksize, compression, KeyValue.KEY_COMPARATOR); } private void close(final HFile.Writer w) throws IOException { if (w != null) { StoreFile.appendMetadata(w, System.currentTimeMillis(), true); w.close(); } } public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) { close(e.getValue().writer); } } }; } /* * Data structure to hold a Writer and amount of data written on it. */ static class WriterLength { long written = 0; HFile.Writer writer = null; } }
可以看到,它的工作流程就是首先根據你的配置檔案初始化,然後寫成hfile的格式。
這裡我做了個偷懶的demo:
HFileOutputFormat hf = new HFileOutputFormat(); HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/home/performance/softs/hadoop/conf/core-site.xml")); conf.set("mapred.output.dir", "/tmp"); conf.set("hfile.compression", Compression.Algorithm.LZO.getName()); TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID()); RecordWriter writer = hf.getRecordWriter(context); KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:action"), System.currentTimeMillis(), Bytes.toBytes("test")); KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:id"), System.currentTimeMillis(), Bytes.toBytes("123")); KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:action"), System.currentTimeMillis(), Bytes.toBytes("test")); KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:id"), System.currentTimeMillis(), Bytes.toBytes("123")); writer.write(null, kv); writer.write(null, kv1); writer.write(null, kv3); writer.write(null, kv4); writer.close(context);
執行然之後,會在hdfs的/tmp目錄下生成一份檔案。注意批量寫資料的時候一定要保證key的有序性
這個時候,hbase自己提供的一個基於jruby的loadtable.rb指令碼就可以發揮作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路徑:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
執行完之後:
執行./hbase shell
>list
就會顯示剛才匯入的offer表了。