1. 程式人生 > >如何使用scala+spark讀寫hbase?

如何使用scala+spark讀寫hbase?

最近工作有點忙,所以文章更新頻率低了點,希望大家可以諒解,好了,言歸正傳,下面進入今天的主題:

如何使用scala+spark讀寫Hbase

軟體版本如下:

scala2.11.8

spark2.1.0

hbase1.2.0

公司有一些實時資料處理的專案,儲存用的是hbase,提供實時的檢索,當然hbase裡面儲存的資料模型都是簡單的,複雜的多維檢索的結果是在es裡面儲存的,公司也正在引入Kylin作為OLAP的資料分析引擎,這塊後續有空在研究下。

接著上面說的,hbase儲存著一些實時的資料,前兩週新需求需要對hbase裡面指定表的資料做一次全量的update以滿足業務的發展,平時操作hbase都是單條的curd,或者插入一個批量的list,用的都是hbase的java api比較簡單,但這次涉及全量update,所以如果再用原來那種單執行緒的操作api,勢必速度回慢上許多。

關於批量操作Hbase,一般我們都會用MapReduce來操作,這樣可以大大加快處理效率,原來也寫過MR操作Hbase,過程比較繁瑣,最近一直在用scala做spark的相關開發,所以就直接使用scala+spark來搞定這件事了,當然底層用的還是Hbase的TableOutputFormat和TableOutputFormat這個和MR是一樣的,在spark裡面把從hbase裡面讀取的資料集轉成rdd了,然後做一些簡單的過濾,轉化,最終在把結果寫入到hbase裡面。

整個流程如下:

(1)全量讀取hbase表的資料

(2)做一系列的ETL

(3)把全量資料再寫回hbase

核心程式碼如下:

//獲取conf
 val conf=HBaseConfiguration.create()
  //設定讀取的表
  conf.set(TableInputFormat.INPUT_TABLE,tableName)
  //設定寫入的表
  conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
//建立sparkConf    
   val sparkConf=new SparkConf()
   //設定spark的任務名
   sparkConf.setAppName("read and write for hbase ")
   //建立spark上下文
   val
sc=new SparkContext(sparkConf) //為job指定輸出格式和輸出表名 val newAPIJobConfiguration1 = Job.getInstance(conf) newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName) newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) //全量讀取hbase表 val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat] ,classOf[ImmutableBytesWritable] ,classOf[Result] ) //過濾空資料,然後對每一個記錄做更新,並轉換成寫入的格式 val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas) //轉換後的結果,再次做過濾 val save_rdd=final_rdd.filter(checkNull) //最終在寫回hbase表 save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration) sc.stop()

從上面的程式碼可以看出來,使用spark+scala操作hbase是非常簡單的。下面我們看一下,中間用到的幾個自定義函式:

第一個:checkNotEmptyKs

作用:過濾掉空列簇的資料

def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={
    val r=f._2
    val rowkey=Bytes.toString(r.getRow)
    val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala
    if(map.isEmpty)  false else true
  }

第二個:forDatas

作用:讀取每一條資料,做update後,在轉化成寫入操作

def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={
      val r=f._2 //獲取Result
      val put:Put=new Put(r.getRow) //宣告put
      val ks=Bytes.toBytes("ks") //讀取指定列簇
      val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
      map.foreach(kv=>{//遍歷每一個rowkey下面的指定列簇的每一列的資料做轉化
                val kid= Bytes.toString(kv._1)//知識點id
                var value=Bytes.toString(kv._2)//知識點的value值
		value="修改後的value"
		put.addColumn(ks,kv._1,Bytes.toBytes(value))	//放入put物件
      }
      )
    if(put.isEmpty)  null  else (new ImmutableBytesWritable(),put)

  }

第三個:checkNull 作用:過濾最終結果裡面的null資料

def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={
    if(f==null)  false  else true
  }

上面就是整個處理的邏輯了,需要注意的是對hbase裡面的無效資料作過濾,跳過無效資料即可,邏輯是比較簡單的,程式碼量也比較少。

除了上面的方式,還有一些開源的框架,也封裝了相關的處理邏輯,使得spark操作hbase變得更簡潔,有興趣的朋友可以瞭解下,github連結如下:

有什麼問題可以掃碼關注微信公眾號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。

輸入圖片說明