spark使用hbasefilter訪問hbase表資料(封裝)
阿新 • • 發佈:2019-01-06
自己嘗試實現的類,
本類的作用:呼叫內部方法
根據輸入的表名,列族,列名,篩選需要展示的列
根據輸入的列族,列名,列值,篩選條件,是用過濾器過濾資料
返回值:SQLContext
已完成表的註冊,可以直接操作sql方法,使用sql語言查詢處理
程式碼如下
package cn.deppon.Tool import java.util import scala.collection.JavaConverters._ import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan} import org.apache.hadoop.hbase.filter._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types._ import org.apache.spark.{SparkConf, SparkContext} /** * Created by DreamBoy on 2017/5/12. */ object SparkHbaseTool { /** * 利用主構造器構造需要的環境的基本條件 */ Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) //設定spark引數 val conf = new SparkConf().setMaster("local[2]") .setAppName("HbaseTest") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) val hbaseConf = HBaseConfiguration.create() val sqlContext = new SQLContext(sc) //配置HBase hbaseConf.set("hbase.rootdir", "hdfs://http://192.168.10.228/hbase") hbaseConf.set("hbase.zookeeper.quorum", "192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") hbaseConf.set("hbase.master", "192.168.10.230") def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } /** * * @param tbl_nm 表名 * @param show_col _1 列族 _2列名 _3 列型別(String,Int,Double,Timestamp...) * @param filter_col _1 列族 _2列名 _3 篩選值 _4 篩選型別(=,<,>,!=...) * @return sqlcontext */ def getTableNm(tbl_nm: String, show_col: Array[(String, String, String)], filter_col: Array[(String, String, String, String)]): (SQLContext) = { hbaseConf.set(TableInputFormat.INPUT_TABLE, tbl_nm) val table = new HTable(hbaseConf, tbl_nm) val scan = new Scan() /** * 指定列族和需要顯示的列名 * 新增多個需要用到的列 */ /* val length = show_col.length for(i <- show_col){ scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2)) } */ //設定rowkey的範圍,啟示和結束 //scan.setStartRow(Bytes.toBytes("")) //scan.setStopRow(Bytes.toBytes("")) val fil_len = filter_col.length println("------>>>>" + fil_len) //如果沒有新增過濾器,就給過濾器新增空 if (fil_len > 0) { val filter_arr = new util.ArrayList[Filter](fil_len) for (i <- filter_col) { i._4 match { case "=" => { val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1), Bytes.toBytes(i._2), CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(i._3))) filter1.setFilterIfMissing(true) filter_arr.add(filter1) } case "<" => { val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1), Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes(i._3))) filter1.setFilterIfMissing(true) filter_arr.add(filter1) } case "<=" => { val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1), Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3))) filter1.setFilterIfMissing(true) filter_arr.add(filter1) } case ">" => { val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1), Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(i._3))) filter1.setFilterIfMissing(true) filter_arr.add(filter1) } case ">=" => { val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1), Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3))) //filter1.setFilterIfMissing(true) filter_arr.add(filter1) } case "!=" => { val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1), Bytes.toBytes(i._2), CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(i._3))) filter1.setFilterIfMissing(true) filter_arr.add(filter1) } case _ => {} } } /** * 通過使用filterlist可以載入多個過濾器 * 設定多個過濾器 */ val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_arr) scan.setFilter(filterList) } else { scan.setFilter(null) } //hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) //獲取表的掃描 val ColumnValueScanner = table.getScanner(scan) //構建structtype需要的list 根據傳入的型別引數構建表 /*var list_col = show_col.map{x=>{ /* x._3 match { case "String" => StructField(x._2,StringType,true) case "Int" => StructField(x._2,StringType,true) case "Double" => StructField(x._2,StringType,true) case "Timestamp" => StructField(x._2,StringType,true) case _ => StructField(x._2,StringType,true) }*/ StructField(x._2,StringType,true) } }*/ /** * structType構造的目的:為在後面產生dataframe的時候指定每個值的列名 * 在註冊成表的時候可以使用 */ var list_col: List[StructField] = List() list_col :+= StructField("id", StringType, true) for (i <- show_col) { list_col :+= StructField(i._2, StringType, true) } //構建表的structType val schema = StructType(list_col) val tbl_rdd = ColumnValueScanner.iterator().asScala //把過濾器載入到hbaseconf中 hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan)) //構建RDD val hbaseRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) //構建rdd的結果集 val rowRDD = hbaseRDD.map { case (_, result) => { var valueSeq: Seq[String] = Seq() //獲取行鍵 val key = Bytes.toString(result.getRow) //通過列族和列名獲取列 不加rowkey方法 // for(column <- columns) { // valueSeq :+= Bytes.toString(result.getValue(family.getBytes, column.getBytes)) // } //加rowkey方法,Array第一列必須是"rowkey" valueSeq :+= key for (row <- show_col) { valueSeq :+= Bytes.toString(result.getValue(row._1.getBytes, row._2.getBytes)) } Row.fromSeq(valueSeq) } } val hbasedataframe = sqlContext.createDataFrame(rowRDD, schema) hbasedataframe.registerTempTable(tbl_nm) sqlContext } }