1. 程式人生 > >Python-threadpool多執行緒多個引數傳入示例

Python-threadpool多執行緒多個引數傳入示例

主要找到了兩種方法,一種是將引數構造成List進行傳入;還有一種是將引數構造成dict進行傳入。
樣例程式碼:
/Users/nisj/PycharmProjects/EsDataProc/bi-static/ThreadPool_multiPar.py
# -*- coding=utf-8 -*-
import threadpool
import time

def Main_Def(par1, par2, par3):
    print "par1 = %s, par2 = %s, par3 = %s" % (par1, par2, par3)


if __name__ == '__main__':
    # 方法1
    list_var1 = ['1', '2', '3']
    list_var2 = ['4', '5', '6']
    par_list = [(list_var1, None), (list_var2, None)]
    # 方法2
    # dict_var1 = {'par1': '1', 'par2': '2', 'par3': '3'}
    # dict_var2 = {'par1': '4', 'par2': '5', 'par3': '6'}
    # par_list = [(None, dict_var1), (None, dict_var2)]

    pool = threadpool.ThreadPool(2)
    requests = threadpool.makeRequests(Main_Def, par_list)
    [pool.putRequest(req) for req in requests]
    time.sleep(1)
    pool.wait()


用於實際生產處理的例項:
/Users/nisj/PycharmProjects/EsDataProc/bi-static/Hive_remain_pay_byWeek_runing.py

# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import threadpool

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "當前時間是:",now_time

def dateRange(beginDate, endDate):
    dates = []
    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
    date = beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%Y-%m-%d")
    return dates

def weekRang(beginDate, endDate):
    week = set()
    for date in dateRange(beginDate, endDate):
        week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2])

    wk_l = []
    for wl in sorted(list(week)):
        wk_l.append(str(wl[0])+'#'+str(wl[1]))
    return wk_l

def hisWeekList(curr_week):
    last_wk = datetime.datetime.now() - datetime.timedelta(days=7)
    end_day = str(last_wk)[0:10]
    his_week_list = []
    for week in weekRang('2015-07-01', end_day):
        if (int(week[0:4]) == int(curr_week[0:4]) and int(week[5:]) <= int(curr_week[5:])) or (int(week[0:4]) < int(curr_week[0:4])):
            his_week_list.append(week)
    return his_week_list

def getLastWeek(d):
    dayscount = datetime.timedelta(days=d.isoweekday())
    dayto = d - dayscount
    sixdays = datetime.timedelta(days=6)
    dayfrom = dayto - sixdays
    return str(dayfrom)[0:10], str(dayto)[0:10]

def hisRunWeekList():
    d = datetime.datetime.now()
    dayfrom = getLastWeek(d)[0]
    dayto = getLastWeek(d)[1]
    curr_week = weekRang(dayfrom, dayto)[0]
    batch_week_list = []
    for his_week in hisWeekList(curr_week):
        if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])):
            batch_week_list.append(([curr_week, his_week],None))
    return batch_week_list

def user_remain_proc(curr_week ,his_week):
    os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \
                delete from bi_user_remain_pay_byweek where data_week='%s' and remain_week='%s'; \
                 " """ % (his_week, curr_week))

    newuser_remain_pay_data = os.popen("""source /etc/profile; \
        /usr/lib/hive-current/bin/hive -e " \
        add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \
        create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
        with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
        from bi_all_access_log_of_new \
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
        group by appsource,appkey,identifier,RadixChange(uid,16,10)), \
        curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
        from bi_all_access_log \
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
        group by appsource,appkey,identifier,RadixChange(uid,16,10)), \
        curr_week_pay as (select uid,sum(amount) amount \
        from data_chushou_pay_info \
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
        group by uid) \
        select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,sum(a3.amount) pay_amount \
        from his_new_user a1 \
        inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
        left join curr_week_pay a3 on a1.uid=a3.uid \
        group by a1.appkey,a1.appsource \
        ;" \
        """ % (his_week, curr_week, curr_week)).readlines();

    nrpd_list = []
    for nrp_list in newuser_remain_pay_data:
        nrp = re.split('\t', nrp_list.replace('\n', ''))
        nrpd_list.append(nrp)
    for nrpd in nrpd_list:
        remain_week = curr_week
        appkey = nrpd[0]
        appsource = nrpd[1]
        remain_cnt = nrpd[2]
        pay_amount = nrpd[3]
        etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

        os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \
        insert into bi_user_remain_pay_byweek(data_week,appsource,appkey,remain_week,remain_cnt,pay_amount,etl_time) \
        select '%s','%s','%s','%s','%s','%s','%s'; \
         " """ % (his_week, appsource, appkey, remain_week, remain_cnt, pay_amount, etl_time))


batch_week_list = hisRunWeekList()
requests = threadpool.makeRequests(user_remain_proc, batch_week_list)
main_pool = threadpool.ThreadPool(13)
[main_pool.putRequest(req) for req in requests]
if __name__ == '__main__':
    while True:
        try:
            time.sleep(9)
            main_pool.poll()
        except KeyboardInterrupt:
            print("**** Interrupted!")
            break
        except threadpool.NoResultsPending:
            break

    if main_pool.dismissedWorkers:
        print("Joining all dismissed worker threads...")
        main_pool.joinAllDismissedWorkers()

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "當前時間是:",now_time

此例項也是文章【Python控制資料(留存及支付資訊)按周進行跑批處理--->http://blog.csdn.net/babyfish13/article/details/54096306】的後續。