1. 程式人生 > >python 之pydhfs 對hdfs 進行操作

python 之pydhfs 對hdfs 進行操作

#################################################################
#################################################################
#################################################################
#### 615 明輝科技### 針對所有子專案開發的公共 utils,不具有業務熟悉 
#### 作者: 曹明傑
#---------------------針對hdfs------------------------#


# !coding:utf-8
import  os
from pyhdfs import HdfsClient, HdfsFileNotFoundException
from brushtickes.com.mh.brush.brushutils \
    import brush_util as bu


# 關於python操作hdfs的API可以檢視官網:
# https://hdfscli.readthedocs.io/en/latest/api.html


# 讀取hdfs檔案內容,將資料放入到本地的目錄的檔案中
#並將讀取的資料返回
def read_hdfs_file( filename,local_path='',**kwargs):
    #HTTPResponse
    """Return a file-like object for reading the given HDFS path.
    :param offset: The starting byte position.
    :type offset: long
    :param length: The number of bytes to be processed.
    :type length: long
    :param buffersize: The size of the buffer used in transferring data.
    :type buffersize: int
    :rtype: file-like object
    """
    client__open = get_client().open(filename,**kwargs)
    print(client__open.data)
    if local_path != '':
        try:
         open(local_path,'a+').write(client__open.data)
        except TypeError:
            open(local_path, 'ab+').write(client__open.data)
    return client__open.data

# 建立目錄
def mkdirs( hdfs_path):
    get_client().mkdirs(hdfs_path)


# 刪除hdfs檔案
def delete_hdfs_file(hdfs_path):
    get_client().delete(hdfs_path)


# 上傳檔案到hdfs
def put_to_hdfs_no_flag( local_path, hdfs_path):
    upload = upload = get_client().copy_from_local(hdfs_path,local_path)
    print_base_log(upload, '上傳檔案到hdfs,並把本地上傳的檔案標誌位 success 已經完成', 'put_to_hdfs')


# 上傳檔案到hdfs,並把本地上傳的檔案標誌位 success 已經完成
def put_to_hdfs_flag( local_path, hdfs_path):
    upload = get_client().copy_from_local(local_path,hdfs_path)
    os.rename(local_path,"success_"+local_path+str(bu.get_new_time()))
    print_base_log(upload,'上傳檔案到hdfs,並把本地上傳的檔案標誌位 success 已經完成','put_to_hdfs_flag')

# 從hdfs獲取檔案到本地
def get_from_hdfs( hdfs_path, local_path):
    get_client().copy_to_local(hdfs_path,local_path)


# 追加資料到hdfs檔案
def append_to_hdfs( hdfs_path, data):
    get_client().append(hdfs_path,data)


# 覆蓋資料寫到hdfs檔案
def write_to_hdfs(hdfs_path, data):
    exists = get_client().exists(hdfs_path)
    if exists:
        get_client().delete(hdfs_path)
    get_client().append(hdfs_path,data)



# 移動或者修改檔案
def move_or_rename(hdfs_src_path, hdfs_dst_path):
    get_client().rename(hdfs_src_path,hdfs_dst_path)


# 返回目錄下的檔案資訊
def list( hdfs_path):
   try:
       resp =  get_client().listdir(hdfs_path)
       print_base_log(str(resp),hdfs_path+'下的目錄檔案資訊','list')
   except NotADirectoryError:
       print_base_log("該"+hdfs_path+"不是資料夾", hdfs_path + '下的目錄檔案資訊', 'list')
   except HdfsFileNotFoundException:
       print_base_log("該" + hdfs_path + "不存在", hdfs_path + '下的目錄檔案資訊', 'list')

   return resp

#這裡訪問的是50070 埠
def get_client(hdfs_url='hdfs://node1:50070'):
    return  HdfsClient(hosts='node1:50070', user_name='root')

def print_base_log(obj,item='hdfs',option=''):
    bu.print_custom_masg(obj, item,option, 'base_utils.py')

def put_to_hdfs_flag2(local_path,hdfs_path):
    client = HdfsClient(hosts='node1:50070', user_name='root')
    client.copy_from_local(local_path,hdfs_path)  # 本地檔案絕對路徑,HDFS目錄必須不存在

親測可行。就發出來給大家參考