1. 程式人生 > >python通過thrift來操作hbase

python通過thrift來操作hbase

2012-06-01

http://abloz.com

date:2012.6.1

引言

hbase用java來操作是最方便,也效率最高的方式。但java並非輕量級,不方便在任何環境下除錯。而且不同的開發人員熟悉的語言不一樣,開發效率也不一樣。hbase 通過thrift,還可以用python,ruby,cpp,perl等語言來操作。

thrift是facebook開發開源的類似google的protobuf的遠端呼叫元件。但protobuf只有資料的序列化,且只支援二進位制協議,沒有遠端呼叫部分。protobuf原生支援cpp,python,java,另外還有第三方實現的objectc,ruby等語言。而thrift是實現了序列化,傳輸,協議定義,遠端呼叫等功能,跨語言能力更多。某些方面二者可以互相替代,但一些方面則各有適用範圍。

啟動thrift伺服器

首先可以將hbase的path新增到.bashrc,這樣便於在在任何目錄執行hbase命令。

啟動thrift server,預設偵聽9090埠,如果有衝突,可以用-p引數修改預設埠。
 hbase thrift -p 19090 start
 [[email protected] hbase-0.94.0]$ hbase thrift -p 19090 start
 12/06/01 17:54:27 INFO util.VersionInfo: HBase 0.94.0
 12/06/01 17:54:27 INFO util.VersionInfo: Subversion https://svn.apache.org/repos/asf/hbase/branches/0.94 -r 1332822
 12/06/01 17:54:27 INFO util.VersionInfo: Compiled by jenkins on Tue May 1 21:43:54 UTC 2012
 Exception in thread "main" java.lang.AssertionError: Exactly one option out of [-hsha, -nonblocking, -threadpool, -threadedselector] has to be specified
 at org.apache.hadoop.hbase.thrift.ThriftServerRunner$ImplType.setServerImpl(ThriftServerRunner.java:201)
 at org.apache.hadoop.hbase.thrift.ThriftServer.processOptions(ThriftServer.java:169)
 at org.apache.hadoop.hbase.thrift.ThriftServer.doMain(ThriftServer.java:85)
 at org.apache.hadoop.hbase.thrift.ThriftServer.main(ThriftServer.java:192)




[
[email protected]
hbase-0.94.0]$ hbase thrift -p 19090 -nonblocking start ... 12/06/01 17:25:38 INFO thrift.ThriftServerRunner: starting HBase TNonblockingServer server on 19090 [[email protected] hbase-0.94.0]$ hbase thrift -p 19090 -hsha start 12/06/01 17:55:27 DEBUG thrift.ThriftServerRunner: Using binary protocol 12/06/01 17:55:27 DEBUG thrift.ThriftServerRunner: Using framed transport 12/06/01 17:55:27 INFO thrift.ThriftServerRunner: starting HBase THsHaServer server on 19090

處理Hbase.thrift

編譯hbase自帶的Hbase.thrift,生成python語言的呼叫程式碼

[[email protected] hbase-0.94.0]$ cd src/main/resources/org/apache/hadoop/hbase/thrift/
 [[email protected] thrift]$ ls
 Hbase.thrift
 [[email protected] thrift]$ thrift --gen py Hbase.thrift
 [[email protected] thrift]$ ls
 gen-py Hbase.thrift




[[email protected] thrift]$ sudo mv gen-py /usr/local/lib/python2.7/site-packages/

檢視現有hbase表

[[email protected] test]$ hbase shell
 hbase(main):003:0> list
 t1
 1 row(s) in 0.5320 seconds

寫客戶端測試程式碼

[[email protected] test]$ vi t.py
 #!/usr/bin/env python
 #coding:utf8
 #author:abloz.com
 #date:2012.6.1
 import sys
 #Hbase.thrift生成的py檔案放在這裡
 sys.path.append('/usr/local/lib/python2.7/site-packages/gen-py')
 from thrift import Thrift
 from thrift.transport import TSocket
 from thrift.transport import TTransport
 from thrift.protocol import TBinaryProtocol




from hbase import Hbase
 #如ColumnDescriptor 等在hbase.ttypes中定義
 from hbase.ttypes import *




# Make socket
 #此處可以修改地址和埠
 transport = TSocket.TSocket('localhost', 19090)
 # Buffering is critical. Raw sockets are very slow
 # 還可以用TFramedTransport,也是高效傳輸方式
 transport = TTransport.TBufferedTransport(transport)
 # Wrap in a protocol
 #傳輸協議和傳輸過程是分離的,可以支援多協議
 protocol = TBinaryProtocol.TBinaryProtocol(transport)
 #客戶端代表一個使用者
 client = Hbase.Client(protocol)
 #開啟連線
 transport.open()




print client.getTableNames()

執行

[[email protected] test]$ python t.py
 Traceback (most recent call last):
 File "t.py", line 27, in <module>
 print client.getTableNames()
 File "/usr/local/lib/python2.7/site-packages/gen-py/hbase/Hbase.py", line 769, in getTableNames
 return self.recv_getTableNames()
 File "/usr/local/lib/python2.7/site-packages/gen-py/hbase/Hbase.py", line 779, in recv_getTableNames
 (fname, mtype, rseqid) = self._iprot.readMessageBegin()
 File "build/bdist.linux-x86_64/egg/thrift/protocol/TBinaryProtocol.py", line 126, in readMessageBegin
 File "build/bdist.linux-x86_64/egg/thrift/protocol/TBinaryProtocol.py", line 203, in readI32
 File "build/bdist.linux-x86_64/egg/thrift/transport/TTransport.py", line 58, in readAll
 File "build/bdist.linux-x86_64/egg/thrift/transport/TTransport.py", line 160, in read
 File "build/bdist.linux-x86_64/egg/thrift/transport/TSocket.py", line 94, in read
 socket.error: [Errno 104] Connection reset by peer




server 列印:
 12/06/01 17:55:40 ERROR server.THsHaServer: Read an invalid frame size of -2147418111. Are you using TFramedTransport on the client side?
 要想提高傳輸效率,必須使用TFramedTransport或TBufferedTransport.但對-hsha,-nonblocking兩種伺服器模式,必須使用TFramedTransport。將其改為執行緒方式試試。
 [[email protected] hbase-0.94.0]$ hbase thrift -p 19090 -threadpool start
 ...
 12/06/01 18:02:17 DEBUG thrift.ThriftServerRunner: Using binary protocol
 12/06/01 18:02:17 INFO thrift.ThriftServerRunner: starting TBoundedThreadPoolServer on /0.0.0.0:19090; min worker threads=16, max worker threads=1000, max queued requests=1000

[[email protected] test]$ python t.py [‘t1’] 列印正確

建立表:

 檢視HBase.thrift中關於建立表的描述:
 /**
 * Create a table with the specified column families. The name
 * field for each ColumnDescriptor must be set and must end in a
 * colon (:). All other fields are optional and will get default
 * values if not explicitly specified.
 *
 * @throws IllegalArgument if an input parameter is invalid
 *
 * @throws AlreadyExists if the table name already exists
 */
 void createTable(
 /** name of table to create */
 1:Text tableName,




/** list of column family descriptors */
 2:list<ColumnDescriptor> columnFamilies
 ) throws (1:IOError io, 2:IllegalArgument ia, 3:AlreadyExists exist)




列描述符的描述:
 /**
 * An HColumnDescriptor contains information about a column family
 * such as the number of versions, compression settings, etc. It is
 * used as input when creating a table or adding a column.
 */
 struct ColumnDescriptor {
 1:Text name,
 2:i32 maxVersions = 3,
 3:string compression = "NONE",
 4:bool inMemory = 0,
 5:string bloomFilterType = "NONE",
 6:i32 bloomFilterVectorSize = 0,
 7:i32 bloomFilterNbHashes = 0,
 8:bool blockCacheEnabled = 0,
 9:i32 timeToLive = -1
 }

追加程式碼

#建立測試表,使用者資訊表。每個列名都必須跟":"
try:
    colusername = ColumnDescriptor( name = 'username:',maxVersions = 1 )
    colpass = ColumnDescriptor( name = 'pass:',maxVersions = 1 )
    colage = ColumnDescriptor( name = 'age:',maxVersions = 1 )
    colinfo = ColumnDescriptor( name = 'info:',maxVersions = 1 )

    client.createTable('tusers', [colusername,colpass,colage,colinfo])

    print client.getTableNames()

except AlreadyExists, tx:
    print "Thrift exception"
    print '%s' % (tx.message)




執行:
 [[email protected] test]$ python t.py
 ['t1']
 ['t1', 'tusers']
 [[email protected] test]$ python t.py
 ['t1', 'tusers']
 Thrift exception
 table name already in use

參考

如非註明轉載, 均為原創. 本站遵循知識共享CC協議,轉載請註明來源