1. 程式人生 > >通過BulkLoad快速將海量數據導入到Hbase(TDH,kerberos認證)

通過BulkLoad快速將海量數據導入到Hbase(TDH,kerberos認證)

++ zookeeper wke zookeepe ner all throw 利用 tab

技術分享圖片 一、概念 使用BlukLoad方式利用Hbase的數據信息是 按照特點格式存儲在HDFS裏的特性,直接在HDFS中生成持久化的Hfile數據格式文件,然後完成巨量數據快速入庫的操作,配合MapReduce完成這樣的操作。 二、優點 1、不占用Region資源 2、不會產生巨量的寫入I/O、 3、只需要較少的CPU和網絡資源 三、實現原理 通過一個MapReduce Job來實現的,通過job直接生成一個Hbase的內部HFile格式文件 ,用來形成一個特殊的Hbase數據表,然後直接將數據文件加載到運行的集群中,與使用Hbase API相比,使用BulkLoad導入數據占用更少的CPU和網絡資源 四、BulkLoad過程主要包括三部分: 1、從數據源(通常是文本文件或其他的數據庫)提取數據並上傳到HDFS,抽取數據到HDFS和Hbase。 2、利用MapReduce作業處理事先準備的數據,並且大多數情況下需要我們自己編寫Map函數,而Reduce函數不需要我們考慮,由Hbase提供。 該作業需要使用rowkey(行鍵)作為輸出key;keyvalue、put或者delete作為輸出value。MapReduce作業需要使用HFileOutputFormat2 來生成Hbase數據文件。為了有效的導入數據,需要配置HFileOutputFormat2使得每一個輸出文件都在一個合適的區域中。為達到這個目的, MapReduce作業會使用Hadoop的TotalOrderPartitioner類根據表的key值將輸出分割開來。HFileOutputFormat2的方法configureIncrementalLoad() 會自動的完成上面的工作。 3、告訴RegionServers數據的位置並導入數據,通常使用LoadIncrementalHFiles(更為人所知是completebulkload工具),將文件在HDFS上的位置傳遞給它,它會利用RegionServer將數據導入到相應的區域 技術分享圖片

五、實踐操作(kerberos認證)

1、創建表

create ‘hfiletable‘,‘fm1‘,‘fm2‘

2、數據準備

** * @Author: xiaolaotou * @Date: 2018/11/29 */ public class CreateData { public static void main(String[] args) throws IOException, InterruptedException { StringBuffer str = new StringBuffer(); String rowkey="key"; String family1="fm1:name"; String family2="fm2:age"; String value="za"; Integer age=12; for(int i=1;i<5000;i++) { str=str.append(rowkey + i + "\t" + family1 + "\t" + value+i + "\n" + rowkey+i + "\t" + family2 + "\t" +i+"\n"); System.out.println(str); } //寫入本地文件 String fileTxt="/mnt/sata1/yang/BulkLoadHbase/data.txt"; File file=new File(fileTxt); if(!file.getParentFile().exists()){ file.getParentFile().mkdirs(); } if(!file.exists()){ file.createNewFile(); FileWriter fw=new FileWriter(file,false); BufferedWriter bw=new BufferedWriter(fw); System.out.println("寫入完成"); bw.write(String.valueOf(str)); bw.flush(); bw.close(); fw.close(); } PutDataToHdfs(); } //將數據文件上傳到hdfs public static void PutDataToHdfs() throws IOException, InterruptedException { Configuration conf = new Configuration(); FileSystem fs = org.apache.hadoop.fs.FileSystem.get(URI.create("hdfs://172.20.237.104:9000"),conf,"root"); //上傳文件到hdfs fs.copyFromLocalFile(new Path("/mnt/sata1/yang/BulkLoadHbase/data.txt"),new Path("/yang")); } } 註意:在hdfs開啟kerberos認證這個將數據上傳到hdfs不能用,采用生成數據手動上傳
技術分享圖片

3、使用Mapreduce將數據通過Bulkload入到hbase表中

/** * @Author: xiaolaotou * @Date: 2018/11/27 * 使用MapReduce生成HFile文件 */ public class BulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> { public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] valueStrSplit = value.toString().split("\t");//劃分一行數據 String hkey = valueStrSplit[0];//rowkey String family = valueStrSplit[1].split(":")[0];//列族 String column = valueStrSplit[1].split(":")[1];//字段 String hvalue = valueStrSplit[2];//值 final byte[] rowKey = Bytes.toBytes(hkey); final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey); Put HPut = new Put(rowKey); byte[] cell = Bytes.toBytes(hvalue); HPut.add(Bytes.toBytes(family), Bytes.toBytes(column), cell); context.write(HKey, HPut); } /** * @Author: xiaolaotou * @Date: 2018/11/27 */ public class BulkLoadJob { static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class); private static Configuration conf = null; static { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", "172.20.237.104,172.20.237.105,172.20.237.106"); HBASE_CONFIG.set("hbase.master.kerberos.principal", "hbase/_HOST@TDH"); HBASE_CONFIG.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@TDH"); HBASE_CONFIG.set("hbase.security.authentication", "kerberos"); HBASE_CONFIG.set("zookeeper.znode.parent", "/hyperbase1"); HBASE_CONFIG.set("hadoop.security.authentication", "kerberos"); conf = HBaseConfiguration.create(HBASE_CONFIG); } public static void main(String[] args) throws Exception { UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab("hbase/gz237-104", "/etc/hyperbase1/conf/hyperbase.keytab"); String inputPath = "/yang/data.txt"; String outputPath = "/yang/BulkLoad"; Job job = Job.getInstance(conf, "BulkLoadToHbase"); job.setJarByClass(BulkLoadJob.class); job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //拒絕推測式task的運行 job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); //in/out format job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); //指定來源 FileInputFormat.addInputPath(job, new Path(inputPath)); //指定輸出地 FileOutputFormat.setOutputPath(job, new Path(outputPath)); HTable table = new HTable(conf, "hfiletable"); HFileOutputFormat2.configureIncrementalLoad(job, table); boolean b = job.waitForCompletion(true); if (b) { FsShell shell = new FsShell(conf); try { shell.run(new String[]{"-chmod", "-R", "777", outputPath}); } catch (Exception e) { logger.error("不能改變文件權限 ", e); throw new IOException(e); } //加載到hbase表 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(new Path(outputPath), table); System.out.println("執行成功"); } else { System.out.println("執行失敗"); logger.error("加載失敗!"); System.exit(1); } } } 過程中遇到的報錯: 技術分享圖片

解決:protobuf-java-2.5.0.jar因為包沖突,由於我創建project時,結構為父模塊和子模塊,可能在導包的時候,被其他子模塊的包給沖突了。因此,我新建了一個project重新打包到linux運行成功。

技術分享圖片

技術分享圖片

技術分享圖片

通過BulkLoad快速將海量數據導入到Hbase(TDH,kerberos認證)