大資料系列之實時計算Spark(十七)Python與Hbase整合
阿新 • • 發佈:2019-02-11
1.準備工作(所用到的工具庫會放在最後供下載使用)
1.1.安裝thrift
cmd>pip install thrift
我使用的是Anaconda3,下載下來的包會存放到 /Lib/site-packages/目錄下,如果沒有使用Anaconda3,那把如下的兩個資料夾直接放到python安裝目錄/Lib/site-packages/下即可
1.2.將hbase(再後面可以下載,hbase目錄下是python與hbase連結所使用的庫)資料夾複製到當前工程目錄下
1.3.啟動hbase叢集
1.4.啟動hbase的thriftserver,滿足和第三方應用通訊
$>hbase-daemon.sh start thrift2
檢視webui: http://s10:9095/ //webui埠
//9090 rpc埠
1.5程式碼:
# -*- coding: utf-8 -*- # python操作hbase import os # 匯入thrift的python模組 from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol #匯入編譯生成的hbase 模組 from com.py.spark.hbase import THBaseService from com.py.spark.hbase.ttypes import * #hbase的thrift伺服器地址s10:9090 from com.py.spark.hbase.ttypes import TGet transport = TSocket.TSocket('s10', 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = THBaseService.Client(protocol) #開啟連線,準備傳輸資料 transport.open() # #get查詢 table = b'ns1:t1' #定義表名 rowkey=b'row9998' #定義rowkey col_id=TColumn(b"f1",b"id") col_name = TColumn(b"f1",b"name") col_age = TColumn(b"f1",b"age") cols = [col_id,col_name,col_age] get = TGet(rowkey,cols)#建立get物件 res=client.get(table,get) # 最後輸出的結果是排序的 print(bytes.decode(res.columnValues[0].family)) print(bytes.decode(res.columnValues[0].qualifier)) print(bytes.decode(res.columnValues[0].value)) print(res.columnValues[0].timestamp) print(bytes.decode(res.columnValues[1].family)) print(bytes.decode(res.columnValues[1].qualifier)) print(bytes.decode(res.columnValues[1].value)) print(res.columnValues[1].timestamp) print(bytes.decode(res.columnValues[2].family)) print(bytes.decode(res.columnValues[2].qualifier)) print(bytes.decode(res.columnValues[2].value)) print(res.columnValues[2].timestamp) # put操作 table =b'ns1:t1' row=b'row10000' v1=TColumnValue(b'f1',b'id',b'10000') v2=TColumnValue(b'f1',b'name',b'zpx') v3=TColumnValue(b'f1',b'age',b'25') vals=[v1,v2,v3] put=TPut(row,vals) client.put(table,put) print("okkkk!!") # 刪除 # delete table =b'ns1:t1' rowkey=b'row10000' col_id = TColumn(b"f1", b"id") col_name = TColumn(b"f1", b"name") col_age = TColumn(b"f1", b"age") cols = [col_id, col_name,col_age] #構造刪除物件 delete = TDelete(rowkey,cols) res = client.deleteSingle(table, delete) # print("ok") #掃描scan #scan table=b'ns1:t1' startRow=b'row9900' stopRow=b'row9998' col_id = TColumn(b"f1", b"id") col_name = TColumn(b"f1", b"name") col_age = TColumn(b"f1", b"age") cols = [col_id, col_name,col_age] scan=TScan(startRow=startRow,stopRow=stopRow,columns=cols) r=client.getScannerResults(table,scan,100) for x in r: print("====================") print(bytes.decode(x.columnValues[2].family)) print(bytes.decode(x.columnValues[2].qualifier)) print(bytes.decode(x.columnValues[2].value)) print(x.columnValues[2].timestamp)
連結:https://pan.baidu.com/s/1dYY56Sbo_ZZ3lwlRiP5nAw 密碼:akky