1. 程式人生 > >Hive整表資料分成256分表樣式匯出

Hive整表資料分成256分表樣式匯出

不是由於mysql資料裝載的需要,這樣做實在太費時費力;即使用上32個並行,10來分鐘就可以整場匯出的表,要耗時1個半小時。
/Users/nisj/PycharmProjects/BiDataProc/love/userLevel/HiveRunData-yicheng.py

# -*- coding=utf-8 -*-
import os
import time
import datetime
import warnings
import threadpool

warnings.filterwarnings("ignore")

def dateRange(beginDate, endDate):
    dates = []
    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
    date = beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%Y-%m-%d")
    return dates

def hiveRunData(submeterPlus):
    os.system("""/usr/lib/hive-current/bin/hive -e " \
            select uid,total_curr_empval exp \
            from bitmp_all_empirical_value_store_sum \
            where pmod(uid,256)={submeterPlus} and total_curr_empval>0; \
            ">/home/hadoop/nisj/xx/yic/dataDir/user_exp_{submeterPlus}.txt """.format(submeterPlus=submeterPlus));


    os.system("""/usr/lib/hive-current/bin/hive -e " \
            select uid,0 action,total_curr_empval exp,'初始新增' memo \
            from bitmp_all_empirical_value_store_sum \
            where pmod(uid,256)={submeterPlus} and total_curr_empval>0; \
            ">/home/hadoop/nisj/xx/yic/dataDir/user_exp_record_{submeterPlus}.txt """.format(submeterPlus=submeterPlus));

# data export
# hiveRunData()
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "當前時間是:", now_time

parList = []
for submeterPlus in range(0, 256, 1):
    parList.append(submeterPlus)

# print parList
requests = []
request_hiveRunData = threadpool.makeRequests(hiveRunData, parList)
requests.extend(request_hiveRunData)
main_pool = threadpool.ThreadPool(32)
[main_pool.putRequest(req) for req in requests]

if __name__ == '__main__':
    while True:
        try:
            time.sleep(30)
            main_pool.poll()
        except KeyboardInterrupt:
            print("**** Interrupted!")
            break
        except threadpool.NoResultsPending:
            break

    if main_pool.dismissedWorkers:
        print("Joining all dismissed worker threads...")
        main_pool.joinAllDismissedWorkers()

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "當前時間是:", now_time