1. 程式人生 > >HBASE資料匯入HIVE

HBASE資料匯入HIVE

package com.hx.data.hbase

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, HBaseAdmin, Put}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import 
org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.client.Result import org.apache.spark.sql.hive.HiveContext /** * Created by Administrator on 2017/2/20. */ object HbaseUtils { def main(args: Array[String]): Unit = { val config =new SparkConf().setAppName("hbase2hive") val
sc =new SparkContext(config) val hict = new HiveContext(sc) import hict.implicits._ //建立連線 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master") //設定查詢的表名 conf.set(TableInputFormat.INPUT_TABLE,"wx_data"
) val hbaserdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaserdd.cache() val rdd = hbaserdd.map(r=>( Bytes.toString(r._2.getRow), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"agentID".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"agentType".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"content".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"createTime".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"event".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"eventKey".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"fromUserName".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"id".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"itemCount".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"msgId".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"msgType".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"packageId".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"toUserName".getBytes())), Bytes.toString(r._2.getValue("wx_cf".getBytes(),"type".getBytes())) )).toDF("key","agentid","agenttype","content","createtime","event","eventkey", "fromusername","id","itemcount","msgid","msgtype","packageid","tousername","types") rdd.registerTempTable("wx_data_temp") hict.sql("insert into huaxun.wx_data select * from wx_data_temp") sc.stop() // hbaserdd.foreach{ // case(_,result)=> // val key = Bytes.toString(result.getRow) // val agentID = Bytes.toString(result.getValue("wx_cf".getBytes(),"agentID".getBytes())) // val agentType = Bytes.toString(result.getValue("wx_cf".getBytes(),"agentType".getBytes())) // val content = Bytes.toString(result.getValue("wx_cf".getBytes(),"content".getBytes())) // val createTime = Bytes.toString(result.getValue("wx_cf".getBytes(),"createTime".getBytes())) // val event = Bytes.toString(result.getValue("wx_cf".getBytes(),"event".getBytes())) // val eventKey = Bytes.toString(result.getValue("wx_cf".getBytes(),"eventKey".getBytes())) // val fromUserName = Bytes.toString(result.getValue("wx_cf".getBytes(),"fromUserName".getBytes())) // val id = Bytes.toString(result.getValue("wx_cf".getBytes(),"id".getBytes())) // val itemCount = Bytes.toString(result.getValue("wx_cf".getBytes(),"itemCount".getBytes())) // val msgId = Bytes.toString(result.getValue("wx_cf".getBytes(),"msgId".getBytes())) // val msgType = Bytes.toString(result.getValue("wx_cf".getBytes(),"msgType".getBytes())) // val packageId = Bytes.toString(result.getValue("wx_cf".getBytes(),"packageId".getBytes())) // val toUserName = Bytes.toString(result.getValue("wx_cf".getBytes(),"toUserName".getBytes())) // val types = Bytes.toString(result.getValue("wx_cf".getBytes(),"type".getBytes())) // println("遍歷hbaserdd") // } } }