1. 程式人生 > >spark使用hbasefilter訪問hbase表資料(封裝)

spark使用hbasefilter訪問hbase表資料(封裝)

自己嘗試實現的類,

本類的作用:呼叫內部方法

根據輸入的表名,列族,列名,篩選需要展示的列

根據輸入的列族,列名,列值,篩選條件,是用過濾器過濾資料

返回值:SQLContext  

已完成表的註冊,可以直接操作sql方法,使用sql語言查詢處理

程式碼如下

package cn.deppon.Tool

import java.util

import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan}
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by DreamBoy on 2017/5/12.
  */
object SparkHbaseTool {

  /**
    * 利用主構造器構造需要的環境的基本條件
    */
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  //設定spark引數
  val conf = new SparkConf().setMaster("local[2]")
    .setAppName("HbaseTest")
  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  val sc = new SparkContext(conf)
  val hbaseConf = HBaseConfiguration.create()
  val sqlContext = new SQLContext(sc)

  //配置HBase
  hbaseConf.set("hbase.rootdir", "hdfs://http://192.168.10.228/hbase")
  hbaseConf.set("hbase.zookeeper.quorum", "192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")
  hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
  hbaseConf.set("hbase.master", "192.168.10.230")

  def convertScanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }

  /**
    *
    * @param tbl_nm     表名
    * @param show_col   _1 列族  _2列名  _3 列型別(String,Int,Double,Timestamp...)
    * @param filter_col _1 列族  _2列名  _3 篩選值  _4 篩選型別(=,<,>,!=...)
    * @return sqlcontext
    */
  def getTableNm(tbl_nm: String, show_col: Array[(String, String, String)], filter_col: Array[(String, String, String, String)]): (SQLContext) = {

    hbaseConf.set(TableInputFormat.INPUT_TABLE, tbl_nm)
    val table = new HTable(hbaseConf, tbl_nm)
    val scan = new Scan()

    /**
      * 指定列族和需要顯示的列名
      * 新增多個需要用到的列
      */
    /*
    val length = show_col.length
    for(i <- show_col){
      scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2))
    }
    */
    //設定rowkey的範圍,啟示和結束
    //scan.setStartRow(Bytes.toBytes(""))
    //scan.setStopRow(Bytes.toBytes(""))
    val fil_len = filter_col.length
    println("------>>>>" + fil_len)
    //如果沒有新增過濾器,就給過濾器新增空
    if (fil_len > 0) {
      val filter_arr = new util.ArrayList[Filter](fil_len)

      for (i <- filter_col) {
        i._4 match {
          case "=" => {
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
              Bytes.toBytes(i._2), CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case "<" => {
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
              Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes(i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case "<=" => {
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
              Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case ">" => {
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
              Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case ">=" => {
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
              Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
            //filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case "!=" => {
            val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
              Bytes.toBytes(i._2), CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
            filter1.setFilterIfMissing(true)
            filter_arr.add(filter1)
          }
          case _ => {}
        }
      }
      /**
        * 通過使用filterlist可以載入多個過濾器
        * 設定多個過濾器
        */
      val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_arr)
      scan.setFilter(filterList)
    } else {
      scan.setFilter(null)
    }

    //hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
    //獲取表的掃描
    val ColumnValueScanner = table.getScanner(scan)
    //構建structtype需要的list  根據傳入的型別引數構建表
    /*var list_col = show_col.map{x=>{
     /* x._3 match {
        case "String" => StructField(x._2,StringType,true)
        case "Int" => StructField(x._2,StringType,true)
        case "Double" => StructField(x._2,StringType,true)
        case "Timestamp" => StructField(x._2,StringType,true)
        case _ => StructField(x._2,StringType,true)
      }*/
      StructField(x._2,StringType,true)
    }
    }*/
    /**
      * structType構造的目的:為在後面產生dataframe的時候指定每個值的列名
      * 在註冊成表的時候可以使用
      */
    var list_col: List[StructField] = List()
    list_col :+= StructField("id", StringType, true)
    
    for (i <- show_col) {
      list_col :+= StructField(i._2, StringType, true)
    }

    
    //構建表的structType
    val schema = StructType(list_col)

    val tbl_rdd = ColumnValueScanner.iterator().asScala
    //把過濾器載入到hbaseconf中
    hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
    //構建RDD
    val hbaseRDD = sc.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    //構建rdd的結果集
    val rowRDD = hbaseRDD.map { case (_, result) => {
      var valueSeq: Seq[String] = Seq()
      //獲取行鍵
      val key = Bytes.toString(result.getRow)

      //通過列族和列名獲取列  不加rowkey方法
      //      for(column <- columns) {
      //        valueSeq :+= Bytes.toString(result.getValue(family.getBytes, column.getBytes))
      //      }
      //加rowkey方法,Array第一列必須是"rowkey"
      valueSeq :+= key
      for (row <- show_col) {
        valueSeq :+= Bytes.toString(result.getValue(row._1.getBytes, row._2.getBytes))
      }
      Row.fromSeq(valueSeq)
    }
    }
    val hbasedataframe = sqlContext.createDataFrame(rowRDD, schema)

    hbasedataframe.registerTempTable(tbl_nm)
    sqlContext
  }
}