1. 程式人生 > >Spark與HBase的整合

Spark與HBase的整合

前言

之前因為僅僅是把HBase當成一個可橫向擴充套件並且具有持久化能力的KV資料庫,所以只用在了指標儲存上,參看很早之前的一篇文章基於HBase做Storm 實時計算指標儲存。這次將HBase用在了使用者行為儲存上,因為Rowkey的過濾功能也很不錯,可以很方便的把按人或者內容的維度過濾出所有的行為。從某種意義上,HBase的是一個有且僅有一個多欄位複合索引的儲存引擎。

雖然我比較推崇實時計算,不過補資料或者計算曆史資料啥的,批處理還是少不了的。對於歷史資料的計算,其實我是有兩個選擇的,一個是基於HBase的已經儲存好的行為資料進行計算,或者基於Hive的原始資料進行計算,最終選擇了前者,這就涉及到Spark(StreamingPro) 對HBase的批處理操作了。

整合過程

和Spark 整合,意味著最好能有Schema(Mapping),因為Dataframe 以及SQL API 都要求你有Schema。 遺憾的是HBase 有沒有Schema取決於使用者和場景。通常SparkOnHBase的庫都要求你定義一個Mapping(Schema),比如hortonworks的 SHC(https://github.com/hortonworks-spark/shc) 就要求你定義一個如下的配置:

{
"rowkey":"key",
"table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},
"columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"f","col":"col1", "type":"string"}
}
}

看上面的定義已經還是很容易看出來的。對HBase的一個列族和列取一個名字,這樣就可以在Spark的DataSource API使用了,關於如何開發Spark DataSource API可以參考我的這篇文章利用 Spark DataSource API 實現Rest資料來源中使用,SHC大體實現的就是這個API。現在你可以這麼用了:

 val cat = "{\n\"rowkey\":\"key\",\"table\":{\"namespace\":\"default\", \"name\":\"pi_user_log\", \"tableCoder\":\"PrimitiveType\"},\n\"columns\":{\"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},\n\"28360592\":{\"cf\":\"f\",\"col\":\"28360592\", \"type\":\"string\"}\n}\n}"
    val cc = sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog -> cat))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

不過當你有成千上萬個列,那麼這個就無解了,你不大可能一一定義,而且很多時候使用者也不知道會有哪些列,列名甚至可能是一個時間戳。我們現在好幾種情況都遇到了,所以都需要解決:

  1. 自動獲取HBase裡所有的列形成Schema,這樣就不需要使用者配置了。
  2. 規定HBase只有兩個列,一個rowkey,一個 content,content 是一個map,包含所有以列族+列名為key,對應內容為value。

先說說第二種方案(因為其實第一種方案也要依賴於第二種方案):

{
        "name": "batch.sources",
        "params": [
          {
            "inputTableName": "log1",
            "format": "org.apache.spark.sql.execution.datasources.hbase.raw",
            "path": "-",
            "outputTable": "log1"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select rowkey,json_value_collect(content) as actionList from log1",
            "outputTableName":"finalTable"
          }
        ]
      },

首先我們配置了一個HBase的表,叫log1,當然,這裡是因為程式通過hbase-site.xml獲得HBase的連結,所以配置上你看不到HBase相關的資訊。接著呢,在SQL 裡你就可以對content 做處理了。我這裡是把content 轉化成了JSON格式字串。再之後你就可以自己寫一個UDF函式之類的做處理了,從而實現你複雜的業務邏輯。我們其實每個欄位裡儲存的都是JSON,所以我其實不關心列名,只要讓我拿到所有的列就好。而上面的例子正好能夠滿足我這個需求了。

而且實現這個HBase DataSource 也很簡單,核心邏輯大體如下:

case class HBaseRelation(
                          parameters: Map[String, String],
                          userSpecifiedschema: Option[StructType]
                        )(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan with Logging {

  val hbaseConf = HBaseConfiguration.create()


  def buildScan(): RDD[Row] = {
    hbaseConf.set(TableInputFormat.INPUT_TABLE, parameters("inputTableName"))
    val hBaseRDD = sqlContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
      .map { line =>
        val rowKey = Bytes.toString(line._2.getRow)

        import net.liftweb.{json => SJSon}
        implicit val formats = SJSon.Serialization.formats(SJSon.NoTypeHints)

        val content = line._2.getMap.navigableKeySet().flatMap { f =>
          line._2.getFamilyMap(f).map { c =>
            (Bytes.toString(f) + ":" + Bytes.toString(c._1), Bytes.toString(c._2))
          }
        }.toMap

        val contentStr = SJSon.Serialization.write(content)

        Row.fromSeq(Seq(UTF8String.fromString(rowKey), UTF8String.fromString(contentStr)))
      }
    hBaseRDD
  }
}

那麼我們回過頭來,如何讓Spark自動發現Schema呢?大體你還是需要過濾所有資料得到列的合集,然後形成Schema的,成本開銷很大。我們也可以先將我們的資料轉化為JSON格式,然後就可以利用Spark已經支援的JSON格式來自動推倒Schema的能力了。

總體而言,其實並不太鼓勵大家使用Spark 對HBase進行批處理,因為這很容易讓HBase過載,比如記憶體溢位導致RegionServer 掛掉,最遺憾的地方是一旦RegionServer 掛掉了,會有一段時間讀寫不可用,而HBase 又很容易作為實時線上程式的儲存,所以影響很大。

相關推薦

SparkHBase整合

前言 之前因為僅僅是把HBase當成一個可橫向擴充套件並且具有持久化能力的KV資料庫,所以只用在了指標儲存上,參看很早之前的一篇文章基於HBase做Storm 實時計算指標儲存。這次將HBase用在了使用者行為儲存上,因為Rowkey的過濾功能也很不錯,可以很方便的把按

大資料系列之實時計算Spark(十七)PythonHbase整合

1.準備工作(所用到的工具庫會放在最後供下載使用) 1.1.安裝thrift   cmd>pip install thrift   我使用的是Anaconda3,下載下來的包會存放到  /Lib/site-packages/目錄下,如果沒有使用Anaconda3,

sparkflume整合

kcon text org http clas appname spl storage ket spark-streaming與flume整合 push package cn.my.sparkStream import org.apache.spark.SparkCo

Sparkhbase整合遇到的一些問題

1.Spark計算都轉移到了一個節點上,即只有一個節點在計算。      搭建好的spark叢集,進行計算的時候發現,所有的slave節點上的task生成後,快速退出,並且生成好多task。檢視spark ui上發現,只有主節點上有正常task執行,其他的

Hadoop HiveHbase整合+thrift

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

sparkspring整合做web介面

需要實現的功能: 寫訪問spark的介面,也就是從web上輸入網址就能把我們需要的資訊通過提交一個job然後返回給我們json資料。 成果展示: 通過url請求,然後的到一個wordcount的json結果(藉助的是谷歌瀏覽器postman外掛顯示的,

Spark Streaming--3 Spark Kafka整合

引入jar包依賴 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</art

HiveHBase整合(例項)

  例項1 1.先在Hbase中建立表(三列族): create 'ceshi7', {NAME=>'TIME',VERSIONS=>1,BLOCKCACHE=>true,BLOOMFILTER=>'ROW',COMPRESSION=>'SNA

phoenix(鳳凰)hbase整合

一、什麼是Phonenix?Phoenix是構建在HBase上的一個SQL層,能讓我們用標準的JDBC APIs而不是HBase客戶端APIs來建立表,插入資料和對HBase資料進行查詢。 Phoenix完全使用Java編寫,作為HBase內嵌的JDBC驅動。Phoenix查

《深入理解Spark》之SparkKafka整合原理

spark和kafka整合有2中方式 1、receiver 顧名思義:就是有一個執行緒負責獲取資料,這個執行緒叫receiver執行緒 解釋: 1、Spark叢集中的某個executor中有一個receiver執行緒,這個執行緒負責從kafka中獲取資料  注意

【伊利丹】Hadoop-2.5.0-CDH5.2.0/HiveHbase整合實驗

<value>file:///home/kkzhangtao/hive-0.13.1-cdh5.2.0/lib/hive-hbase-handler-0.8.0.jar,file:///home/kkzhangtao/hive-0.13.1-cdh5.2.0/lib/hbase-common-0.

hivehbase整合

Hive整合HBase原理Hive與HBase整合的實現是利用兩者本身對外的API介面互相通訊來完成的,其具體工作交由Hive的lib目錄中的hive-hbase-handler-*.jar工具類來實現,通訊原理如下圖所示。Hive整合HBase後的使用場景:(一)通過Hiv

Spark訪問HBase關聯的Hive表

刪除 sql 也會 影響 ron ble lec lang nbsp 知識點1:創建關聯Hbase的Hive表 知識點2:Spark訪問Hive 知識點3:Spark訪問與Hbase關聯的Hive表 知識點1:創建關聯Hbase的Hive表 兩種方式創建,內部表和外部表

支援kubernetes原生Spark 其他應用的結合(mysql,postgresql,oracle,hdfs,hbase)

安裝執行支援kubernetes原生排程的Spark程式:https://blog.csdn.net/luanpeng825485697/article/details/83651742 dockerfile的目錄 . ├── driver │ └── Dockerfile

hue(5):Huezookeeper、oozie、Hbase整合

一、配置步驟 1.和zookeeper整合,修改hue.ini [zookeeper] [[clusters]] [[[default]]] # Zookeeper ensemble. Comma separated list of Host/

HBaseMapReduce整合操作

1、目的:將HBase中stu_info表中的name放到表user_info中 2、TestHbaseMapper: package com.zzw.hbase.mapreduce; import java.io.IOException; import org.apache.had

java實現spark streamingkafka整合進行流式計算

背景:網上關於spark streaming的文章還是比較多的,可是大多數用scala實現,因我們的電商實時推薦專案以java為主,就踩了些坑,寫了java版的實現,程式碼比較意識流,輕噴,歡迎討論。流程:spark streaming從kafka讀使用者實時點選資料,過濾資

hbaseflume整合程式設計

1、官網下載src包,解壓,需要匯入的——》flume-ng-sinks——》flume-ng-hbase-sink 2、編輯SimpleAsyncHbaseEventSerializer:複製一份重新命名為MySimpleAsyncHbaseEventSerializer

HBase新版本MapReduce整合

1.MapReduce從hbase讀取資料 //讀取hbase表資料 public class HbaseAndMapReduce { public static void main(String[] args) throws Exception

Hadoop HiveHbase關係 整合

用hbase做資料庫,但由於hbase沒有類sql查詢方式,所以操作和計算資料非常不方便,於是整合hive,讓hive支撐在hbase資料庫層面 的 hql查詢.hive也即 做資料倉庫 1. 基於Hadoop+Hive架構對海量資料進行查詢:http://blog.csd