hdfs資料遷移至hbase(python2.7版本)
阿新 • • 發佈:2019-02-15
慣例直接上詳細註釋的程式碼。
任務是將HDFS上多個需要重新編碼的檔案合併後寫入HBASE。
python2.7完成,用3的話可能需要改hbase.py的一些原始碼。
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 9 09:09:56 2018
@author: admin
"""
from hdfs import Client
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import Mutation,BatchMutation
import re
import datetime,time
#client配置(HDFS)
client = Client("http://localhost:50070",root="/",timeout=100,session=False)
#client配置(HBASE)
transport = TSocket.TSocket('localhost', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client0 = Hbase.Client(protocol)
#已經建立的表名
tableName = 'new_table'
#開始計時
start = time.clock()
#開啟HBASE
transport.open()
#全域性匹配所需字串
pattern = re.compile(r'{.*?}')
#遍歷HDFS目錄下檔案
for item in client.list("/JJW/flume/events/18-08-15/",status=False):
#讀檔案內容
with client.read("/JJW/flume/events/18-08-15/{}".format(item)) as reader:
# 解碼(源資料為`b''`樣式)
out=reader.read().decode("unicode_escape")
# 所有匹配到的字串存進陣列
patternContent=[m.group() for m in re.finditer(pattern,out)]
# 測試輸出
# print(count,len(patternContent),patternContent[0],patternContent[len(patternContent)-1])
for item in patternContent:
# 將每一個字典樣式字串陣列,即`str(dic)`轉換為字典
dic=eval(item)
# print(dic)
# 設定HBASE行鍵
rowkey='{}'.format(dic['id'])
# 設定HBASE列值(表設計為以id為行,一個meta-data列族,插入兩個列值)
createtime=Mutation(column="meta-data:createtime", value='{}'.format(dic['createtime']))
updatetime=Mutation(column="meta-data:updatetime", value='{}'.format(dic['updatetime']))
mutations=[createtime,updatetime]
# print(mutations)
# HBASE多值批插入
batchMutation = BatchMutation(rowkey,mutations)
client0.mutateRows(tableName,[batchMutation])
# print(rowkey)
#關閉HBASE
transport.close()
#計時結束
elapsed = (time.clock() - start)
#輸出計時
#print("Time used:",elapsed)
附帶下建表程式碼和結果圖片好了:
table = ColumnDescriptor(name='meta-data:', maxVersions=1)
client.createTable('new_table', [table])