通過Thrift source向Flume傳送資料的Python實現
阿新 • • 發佈:2018-12-27
目前Flume支援Thrift source,即通過一個Thrift服務來收集資料(這一點和scribe是一樣的),然後通過相應的channel傳送到sink中去。以下是具體的實現過程:
環境:Python 2.7.5/CDH4.3 Flume 1.3/Thrift 0.9/
首先,我們需要一個Thrift協議的Python Flume客戶端的模組,這個模組可以根據Thrift的定義自動生成。你應該先從Cloudera的網站上下載到CDH4.3中的Flume tarball :
wget http://archive.cloudera.com/cdh4/cdh/4/
下載到本地之後解壓,在目錄flume-ng-sdk\src\main\thrift下有Thrift對應的定義檔案,並用它來生成對應的客戶端模組:
tar xzvf flume-ng-1.3.0-cdh4.3.0.tar.gz cd apache-flume-1.3.0-cdh4.3.0-bin\flume-ng-sdk\src\main\thrift thrift --gen py flume.thrit
你會在當前目錄下得到一個叫做gen-py的目錄,我們將其更名為genpy之後,放到Python的系統模組路徑中去:
mv gen-py/ /usr/local/lib/python2.7/site-packages/genpy
此時,你就可以通過以下過程來引用這個模組了:
[GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from genpy import flume >>> dir(flume) ['__all__', '__builtins__', '__doc__', '__file__', '__name__', '__package__', '__path__'] >>>
下面,就是利用這模組來封裝一個客戶端模組,注意Flume的Thrift source服務端使用的協議是繼承自TCompactProtocol的TTupleProtocol:
public final class TTupleProtocol extends TCompactProtocol {...
在Thrift Python模組中,只有兩種可選協議:TCompactProtocol, TBinaryProtocol, 很顯然我們需要使用前一種協議,如果使用TBinaryProtocol,會在伺服器端報以下錯誤:
18 Jul 2013 18:25:29,447 ERROR [pool-5-thread-4] (org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run:213) - Thrift error occurred d uring processing of message. org.apache.thrift.protocol.TProtocolException: Expected protocol id ffffff82 but got ffffff80 at org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:472) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662)
在客戶端會有以下的報錯
Traceback (most recent call last):
File "pyflume.py", line 65, in <module>
flume_client.send({'a':'hello', 'b':'world'}, 'events under hello world')
File "pyflume.py", line 53, in send
self.client.append(event)
File "/usr/local/lib/python2.7/site-packages/genpy/flume/ThriftSourceProtocol.py", line 49, in append
return self.recv_append()
File "/usr/local/lib/python2.7/site-packages/genpy/flume/ThriftSourceProtocol.py", line 60, in recv_append
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
File "/usr/local/lib/python2.7/site-packages/thrift/protocol/TBinaryProtocol.py", line 126, in readMessageBegin
sz = self.readI32()
File "/usr/local/lib/python2.7/site-packages/thrift/protocol/TBinaryProtocol.py", line 206, in readI32
buff = self.trans.readAll(4)
File "/usr/local/lib/python2.7/site-packages/thrift/transport/TTransport.py", line 58, in readAll
chunk = self.read(sz - have)
File "/usr/local/lib/python2.7/site-packages/thrift/transport/TTransport.py", line 271, in read
self.readFrame()
File "/usr/local/lib/python2.7/site-packages/thrift/transport/TTransport.py", line 275, in readFrame
buff = self.__trans.readAll(4)
File "/usr/local/lib/python2.7/site-packages/thrift/transport/TTransport.py", line 58, in readAll
chunk = self.read(sz - have)
File "/usr/local/lib/python2.7/site-packages/thrift/transport/TSocket.py", line 118, in read
message='TSocket read 0 bytes')
thrift.transport.TTransport.TTransportException: TSocket read 0 bytes
以下就是具體的實現程式碼,大家可以參考修改:
#coding=utf-8
'''
Created on 2013-07-18
@author: Felix
'''
from genpy.flume import ThriftSourceProtocol
from genpy.flume.ttypes import ThriftFlumeEvent
from thrift.transport import TTransport, TSocket
from thrift.protocol import TCompactProtocol
class _Transport(object):
def __init__(self, thrift_host, thrift_port, timeout=None, unix_socket=None):
self.thrift_host = thrift_host
self.thrift_port = thrift_port
self.timeout = timeout
self.unix_socket = unix_socket
self._socket = TSocket.TSocket(self.thrift_host, self.thrift_port, self.unix_socket)
self._transport_factory = TTransport.TFramedTransportFactory()
self._transport = self._transport_factory.getTransport(self._socket)
def connect(self):
try:
if self.timeout:
self._socket.setTimeout(self.timeout)
if not self.is_open():
self._transport = self._transport_factory.getTransport(self._socket)
self._transport.open()
except Exception, e:
print(e)
self.close()
def is_open(self):
return self._transport.isOpen()
def get_transport(self):
return self._transport
def close(self):
self._transport.close()
class FlumeClient(object):
def __init__(self, thrift_host, thrift_port, timeout=None, unix_socket=None):
self._transObj = _Transport(thrift_host, thrift_port, timeout=timeout, unix_socket=unix_socket)
self._protocol = TCompactProtocol.TCompactProtocol(trans=self._transObj.get_transport())
self.client = ThriftSourceProtocol.Client(iprot=self._protocol, oprot=self._protocol)
self._transObj.connect()
def send(self, event):
try:
self.client.append(event)
except Exception, e:
print(e)
finally:
self._transObj.connect()
def send_batch(self, events):
try:
self.client.appendBatch(events)
except Exception, e:
print(e)
finally:
self._transObj.connect()
def close(self):
self._transObj.close()
if __name__ == '__main__':
import random
flume_client = FlumeClient('192.168.1.141', 4141)
event = ThriftFlumeEvent({'a':'hello', 'b':'world'}, 'events under hello world2')
events = [ThriftFlumeEvent({'a':'hello', 'b':'world'}, 'events under hello world%s' % random.randint(0, 1000)) for _ in range(100)]
flume_client.send(event)
flume_client.send_batch(events)
flume_client.close()
以上程式碼也在github上:https://github.com/sinolambda/pyflume