1. 程式人生 > >大資料-05-Spark之讀寫HBase資料

大資料-05-Spark之讀寫HBase資料

準備工作一:建立一個HBase表

這裡依然是以student表為例進行演示。這裡假設你已經成功安裝了HBase資料庫,如果你還沒有安裝,可以參考大資料-04-Hbase入門,進行安裝,安裝好以後,不要建立資料庫和表,只要跟著本節後面的內容操作即可。

因為hbase依賴於hadoop,因此啟動和停止都是需要按照順序進行
如果安裝了獨立的zookeeper
啟動順序: hadoop-> zookeeper-> hbase
停止順序:hbase-> zookeeper-> hadoop
使用自帶的zookeeper
啟動順序: hadoop-> hbase
停止順序:hbase-> hadoop
如下所示:

cd /usr/local/hadoop
./sbin/start-all.sh
cd /usr/local/hbase
./bin/start-hbase.sh //啟動HBase
./bin/hbase shell  //啟動hbase shell

這樣就可以進入hbase shell命令提示符狀態。下面我們在HBase資料庫中建立student表(注意:在關係型資料庫MySQL中,需要首先建立資料庫,然後再建立表,但是,在HBase資料庫中,不需要建立資料庫,只要直接建立表就可以):

hbase> list                    # 檢視所有表
hbase> disable 'student'       # 禁用表
hbase> drop 'student'          # 刪除表

下面讓我們一起來建立一個student表,我們可以在hbase shell中使用下面命令建立:

hbase> create 'student','info'
hbase> describe 'student'
//首先錄入student表的第一個學生記錄
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
//然後錄入student表的第二個學生記錄
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'

資料錄入結束後,可以用下面命令檢視剛才已經錄入的資料:

//如果每次只檢視一行,就用下面命令
hbase> get 'student','1'
//如果每次檢視全部資料,就用下面命令
hbase> scan 'student'

準備工作二:配置Spark

在開始程式設計操作HBase資料庫之前,需要對做一些準備工作。
(1)請新建一個終端,執行下面命令,把HBase的lib目錄下的一些jar檔案拷貝到Spark中,這些都是程式設計時需要引入的jar包,需要拷貝的jar檔案包括:所有hbase開頭的jar檔案、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar,可以開啟一個終端按照以下命令來操作:

cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./

只有這樣,後面編譯和執行過程才不會出錯。

編寫程式讀取HBase資料

如果要讓Spark讀取HBase,就需要使用SparkContext提供的newAPIHadoopRDD API將表的內容以RDD的形式載入到Spark中。
請在Linux系統中開啟一個終端,然後執行以下命令:

cd /usr/local/spark/mycode
mkdir hbase
cd hbase
mkdir -p src/main/scala
cd src/main/scala
vim SparkOperateHBase.scala

然後,在SparkOperateHBase.scala檔案中輸入以下程式碼:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkOperateHBase {
def main(args: Array[String]) {

    val conf = HBaseConfiguration.create()
    val sc = new SparkContext(new SparkConf())
    //設定查詢的表名
    conf.set(TableInputFormat.INPUT_TABLE, "student")
    val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])
    val count = stuRDD.count()
    println("Students RDD Count:" + count)
    stuRDD.cache()

    //遍歷輸出
    stuRDD.foreach({ case (_,result) =>
        val key = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
        val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
        val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
        println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
    })
}
}

然後就可以用sbt打包編譯。不過,在編譯之前,需要新建一個simple.sbt檔案,在simple.sbt配置檔案中,需要知道scalaVersion、spark-core、hbase-client、hbase-common、hbase-server的版本號。在前面章節大資料-03-Spark入門的“編寫Scala獨立應用程式”部分,我們已經介紹瞭如何尋找scalaVersion和spark-core的版本號,這裡不再贅述。現在介紹如何找到你自己電腦上安裝的HBase的hbase-client、hbase-common、hbase-server的版本號。
請在Linux系統中開啟一個終端,輸入下面命令:

cd /usr/local/hbase    # 這是筆者電腦的hbase安裝目錄
cd lib
ls

ls命令會把“/usr/local/hbase/lib”目錄下的所有jar檔案全部列出來,其中,就可以看到下面三個檔案:

hbase-client-1.1.2.jar          
hbase-common-1.1.2.jar          
hbase-server-1.1.2.jar

根據上面三個檔案,我們就可以得知hbase-client、hbase-common、hbase-server的版本號是1.1.5(當然,你的電腦上可能不是這個版本號,請以你自己的版本號為準)。
有了這些版本號資訊,我們就可以新建一個simple.sbt檔案:

cd /usr/local/spark/mycode/hbase
vim simple.sbt

然後在simple.sbt中錄入下面內容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.2"

儲存該檔案,退出vim編輯器。
然後,輸入下面命令:

find . 

應該可以看到類似下面的檔案結構:

.
./src
./src/main
./src/main/scala
./src/main/scala/SparkOperateHBase.scala
./simple.sbt

下面就可以執行sbt打包命令:

/usr/local/sbt/sbt package

打包成功以後,生成的 jar 包的位置為 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。
最後,通過 spark-submit 執行程式。我們就可以將生成的 jar 包通過 spark-submit 提交到 Spark 中運行了,命令如下:

/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit  --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkOperateHBase"  /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar

特別強調,上面命令中,必須使用“–driver-class-path”引數指定依賴JAR包的路徑,而且必須把”/usr/local/hbase/conf”也加到路徑中。
執行後得到如下結果:

Students RDD Count:2
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24

編寫程式向HBase寫入資料

下面編寫程式向HBase中寫入兩行資料。
請開啟一個Linux終端,輸入如下命令:

cd /usr/local/spark/mycode/hbase
vim src/main/scala/SparkWriteHBase.scala

上面命令用vim編輯器新建了一個檔案SparkWriteHBase.scala,然後,在SparkWriteHBase.scala檔案中輸入下面程式碼:

import org.apache.hadoop.hbase.HBaseConfiguration  
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
import org.apache.spark._  
import org.apache.hadoop.mapreduce.Job  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
import org.apache.hadoop.hbase.client.Result  
import org.apache.hadoop.hbase.client.Put  
import org.apache.hadoop.hbase.util.Bytes  

object SparkWriteHBase {  

  def main(args: Array[String]): Unit = {  
    val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")  
    val sc = new SparkContext(sparkConf)        
    val tablename = "student"        
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  

    val job = new Job(sc.hadoopConfiguration)  
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
    job.setOutputValueClass(classOf[Result])    
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    

    val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //構建兩行記錄
    val rdd = indataRDD.map(_.split(',')).map{arr=>{  
      val put = new Put(Bytes.toBytes(arr(0))) //行健的值 
      put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  //info:name列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))  //info:gender列的值
            put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))  //info:age列的值
      (new ImmutableBytesWritable, put)   
    }}        
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  }    
} 

儲存該檔案退出vim編輯器,然後,使用sbt打包編譯,命令如下:

/usr/local/sbt/sbt package

打包成功以後,生成的 jar 包的位置為 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。實際上,由於之前我們已經編寫了另外一個程式碼檔案SparkOperateHBase.scala,所以,simple-project_2.11-1.0.jar中實際包含了SparkOperateHBase.scala和SparkWriteHBase.scala兩個程式碼檔案的編譯結果(class檔案),在執行命令時,可以通過–class後面的名稱引數來決定執行哪個程式, 這個名字就是scala檔名。
最後,通過 spark-submit 執行程式。我們就可以將生成的 jar 包通過 spark-submit 提交到 Spark 中運行了,命令如下:

/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit  --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkWriteHBase"  /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar

執行後,我們可以切換到剛才的HBase終端視窗,在HBase shell中輸入如下命令檢視結果:

hbase> scan 'student'

相關推薦

資料-05-SparkHBase資料

準備工作一:建立一個HBase表 這裡依然是以student表為例進行演示。這裡假設你已經成功安裝了HBase資料庫,如果你還沒有安裝,可以參考大資料-04-Hbase入門,進行安裝,安裝好以後,不要建立資料庫和表,只要跟著本節後面的內容操作即可。 因為hbase依賴於hadoop,因此啟動和停止都是需要按

SparkHbase資料

環境 spark: 2.4.3 hbase: 1.1.5 步驟 啟動hadoop-3.1.2,hbase2.2.0 把HBas

Spark csv檔案

轉自:https://www.iteblog.com/archives/1380.html CSV格式的檔案也稱為逗號分隔值(Comma-Separated Values,CSV,有時也稱為字元分隔值,因為分隔字元也可以不是逗號。在本文中的CSV格式的資料就

學習筆記:從0開始學習資料-9. MapReduceHbase資料

上節的MapReduce計算WordCount例子是從hdfs讀輸入檔案,計算結果也寫入hdfs MapReduce分散式計算的輸入輸出可以根據需要從hdfs或hbase讀取或寫入,如 A.讀hdfs-->寫hdfs B.讀hdfs-->寫hbase C.讀hbase--

8.spark core數據

鍵值對 逗號 .data air lines man inf return ear spark支持多種數據源,從總體來分分為兩大部分:文件系統和數據庫。 文件系統 ??文件系統主要有本地文件系統、Amazon S3、HDFS等。 ??文件系統中存儲的文件有多種存儲格式。sp

12.spark sql數據

rcfile serializa fig jdbc連接 nco .sh nat 字段 jdb 簡介 ??Spark SQL支持多種結構化數據源,輕松從各種數據源中讀取Row對象。這些數據源包括Parquet、JSON、Hive表及關系型數據庫等。 ??當只使用一部分字段時,

spark通過phoenixhbase(Java版)-轉:https://blog.csdn.net/xiongbingcool/article/details/81458602

pom.xml <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId

SparkHbase的二種方式對比

作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請註明出處 一.傳統方式 這種方式就是常用的TableInputFormat和TableOutputFormat來讀寫hbase,如下程式碼所示 簡單解釋下,用sc.newA

離線輕量級資料平臺SparkJavaRDD關聯join操作

對兩個RDD進行關聯操作,如: 1)檔案post_data.txt包含:post_id\title\content 2)檔案train.txt包含:dev_id\post_id\praise\time 通過post_id關聯,提取post_id\content\praise

資料預處理,檔案為每一行資料增加一個標識ID(JAVA)

對包含多行資料的資料集進行預處理,讀入文字檔案資料集,為每一條記錄增加一個唯一的ID,並儲存成一個新的文字檔案。其中每行的ID生成規則為:每一條記錄對應生成0-33隨機數,每個數對應一個特定省份,最後原始記錄和新生成的省份標籤一起寫入新的文字檔案中。Shell終端執行語句#!

使用Scala MySQL 資料Spark任務執行

初學Spark,需要從資料庫讀取資料給Spark執行,然後將執行結果返回給資料庫。 由於Spark是基於 Scala 開發的,剛開始完全摸不到頭腦,本來是用java將資料庫資料寫到一個檔案,然後spark去讀這個檔案然後執行,又突然想到,既然scala寫的sp

sparkhbase效能對比

一、spark寫入hbase    hbase client以put方式封裝資料,並支援逐條或批量插入。spark中內建saveAsHadoopDataset和saveAsNewAPIHadoopDataset兩種方式寫入hbase。為此,將同樣的資料插入其中對比效能。依賴如下:

spark通過phoenixhbase(Java版)

pom.xml <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>or

資料的解決方案--------分離

讀寫分離的本質是對資料庫進行叢集,這樣就可以在高併發的情況下將資料庫的操作分配到多個數據庫伺服器去處理從而降低單臺伺服器的壓力,不過由於資料庫的特殊性–每臺伺服器所儲存的資料都需要一致,所以資料同步就成了資料庫叢集中最核心的問題。如果多臺伺服器都可以寫資料同步將

多執行緒資料方法鎖方法與shared_ptr+互斥鎖方法的比較

對共享資源進行多執行緒讀寫操作有很多方法,本文舉出兩種方法並進行對比。 一:讀寫鎖方法。執行緒進行讀操作時以讀的方式加鎖,執行緒進行寫操作時用寫的方式加鎖。 二:另外一種比較新奇的方法是使用shared_ptr+互斥鎖。shared_ptr是一種用引用計數實現的智慧指標,當

spark 資料】資料來源的操作

通用的 Load/Save 函式 在最簡單的方式下,預設的資料來源(parquet 除非另外配置通過spark.sql.sources.default)將會用於所有的操作。 Parquet 是一個列式儲存格式的檔案,被許多其他資料處理系統所支援。Spark

sparkhbase

1 配置 1.1 開發環境: HBase:hbase-1.0.0-cdh5.4.5.tar.gzHadoop:hadoop-2.6.0-cdh5.4.5.tar.gzZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gzSpark:spark-2.1.0-bin-hadoop2.

離線輕量級資料平臺Spark中文字元顯示問題的解決

問題:spark讀取文字檔案轉化成JavaRDD後發現中文字元顯示亂碼。 在spark-shell環境裡執行:System.getProperty("file.encoding"),返回GB2312,

如何使用scala+sparkhbase

最近工作有點忙,所以文章更新頻率低了點,希望大家可以諒解,好了,言歸正傳,下面進入今天的主題: 如何使用scala+spark讀寫Hbase 軟體版本如下: scala2.11.8 spark2.1.0 hbase1.2.0 公司有一些實時資料處理的專案,儲存

spark常見操作系列(3)--sparkhbase(2)

接著上一篇, 問題(2): scan有 scan.setCaching(10000) scan.setCacheBlocks(true) 等設定.setCaching ,個人感覺不夠用.hbase 預設是在記憶體裡面放一塊資料用來讀取,所以讀取效率比較高,可是,