1. 程式人生 > >大資料系列之實時計算Spark(十七)Python與Hbase整合

大資料系列之實時計算Spark(十七)Python與Hbase整合

1.準備工作(所用到的工具庫會放在最後供下載使用)

1.1.安裝thrift

  cmd>pip install thrift

  我使用的是Anaconda3,下載下來的包會存放到  /Lib/site-packages/目錄下,如果沒有使用Anaconda3,那把如下的兩個資料夾直接放到python安裝目錄/Lib/site-packages/下即可

                    

1.2.將hbase(再後面可以下載,hbase目錄下是python與hbase連結所使用的庫)資料夾複製到當前工程目錄下

1.3.啟動hbase叢集

1.4.啟動hbase的thriftserver,滿足和第三方應用通訊

    $>hbase-daemon.sh start thrift2

檢視webui: http://s10:9095/        //webui埠
                                                     //9090 rpc埠

1.5程式碼:

# -*- coding: utf-8 -*-
# python操作hbase
import os
# 匯入thrift的python模組
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
#匯入編譯生成的hbase 模組
from com.py.spark.hbase import THBaseService
from com.py.spark.hbase.ttypes import *
#hbase的thrift伺服器地址s10:9090
from com.py.spark.hbase.ttypes import TGet

transport = TSocket.TSocket('s10', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)
#開啟連線,準備傳輸資料
transport.open()
# #get查詢
table = b'ns1:t1'    #定義表名
rowkey=b'row9998'          #定義rowkey
col_id=TColumn(b"f1",b"id")
col_name = TColumn(b"f1",b"name")
col_age = TColumn(b"f1",b"age")
cols = [col_id,col_name,col_age]
get = TGet(rowkey,cols)#建立get物件
res=client.get(table,get)
# 最後輸出的結果是排序的
print(bytes.decode(res.columnValues[0].family))
print(bytes.decode(res.columnValues[0].qualifier))
print(bytes.decode(res.columnValues[0].value))
print(res.columnValues[0].timestamp)
print(bytes.decode(res.columnValues[1].family))
print(bytes.decode(res.columnValues[1].qualifier))
print(bytes.decode(res.columnValues[1].value))
print(res.columnValues[1].timestamp)
print(bytes.decode(res.columnValues[2].family))
print(bytes.decode(res.columnValues[2].qualifier))
print(bytes.decode(res.columnValues[2].value))
print(res.columnValues[2].timestamp)
# put操作
table =b'ns1:t1'
row=b'row10000'
v1=TColumnValue(b'f1',b'id',b'10000')
v2=TColumnValue(b'f1',b'name',b'zpx')
v3=TColumnValue(b'f1',b'age',b'25')
vals=[v1,v2,v3]
put=TPut(row,vals)
client.put(table,put)
print("okkkk!!")
# 刪除
# delete
table =b'ns1:t1'
rowkey=b'row10000'
col_id = TColumn(b"f1", b"id")
col_name = TColumn(b"f1", b"name")
col_age = TColumn(b"f1", b"age")
cols = [col_id, col_name,col_age]

#構造刪除物件
delete = TDelete(rowkey,cols)
res = client.deleteSingle(table, delete)
# print("ok")
#掃描scan
#scan
table=b'ns1:t1'
startRow=b'row9900'
stopRow=b'row9998'
col_id = TColumn(b"f1", b"id")
col_name = TColumn(b"f1", b"name")
col_age = TColumn(b"f1", b"age")
cols = [col_id, col_name,col_age]
scan=TScan(startRow=startRow,stopRow=stopRow,columns=cols)
r=client.getScannerResults(table,scan,100)
for x in r:
    print("====================")
    print(bytes.decode(x.columnValues[2].family))
    print(bytes.decode(x.columnValues[2].qualifier))
    print(bytes.decode(x.columnValues[2].value))
    print(x.columnValues[2].timestamp)




連結:https://pan.baidu.com/s/1dYY56Sbo_ZZ3lwlRiP5nAw 密碼:akky