blukload方式將資料寫入HBase
阿新 • • 發佈:2018-12-24
package wondersgroup_0628.com import java.util.{Base64, Date} import com.wonders.TXmltmp import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, HTable, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.{TableInputFormat, TableOutputFormat} import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat, LoadIncrementalHFiles} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object TestTest_4 { def main(args: Array[String]): Unit = { val saprkConf = new SparkConf().setAppName("TextTeset_3") val sc = new SparkContext(saprkConf) val text = args(0) val rdd = sc.textFile(text) val data = rdd.map(_.split("\\|\\|")).map{x=>(x(0),x(1),x(2))}.sortBy(z => z._3) val conf= HBaseConfiguration.create() val table = new HTable(conf, "hbaseTest") conf.set(TableOutputFormat.OUTPUT_TABLE, "hbaseTest") conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,416) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val result = data.map{x => { val tmp = new TXmltmp val j1 = new String( Base64.getDecoder.decode(x._1) ) val j2 = new String( Base64.getDecoder.decode(x._2)) val xml = tmp.load(j1, j2) val kv:KeyValue = new KeyValue(Bytes.toBytes(x._3),"cf2".getBytes(),"age".getBytes(),Bytes.toBytes(xml)) (new ImmutableBytesWritable(Bytes.toBytes(x._3)),kv) } } result.saveAsNewAPIHadoopFile("/tmp/hbaeTest", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) val bulkLoader = new LoadIncrementalHFiles(conf) bulkLoader.doBulkLoad(new Path("/tmp/hbaeTest"), table) // result.saveAsNewAPIHadoopFile("/tmp/iteblog", classOf[ImmutableBytesWritable], classOf[KeyValue], // classOf[HFileOutputFormat], job.getConfiguration()) sc.stop() } }