1. 程式人生 > >odps2.0版本pyodps中apply的使用例項備份

odps2.0版本pyodps中apply的使用例項備份

############################################

import json

from odps.df import DataFrame

from odps import options

#options.verbose = True

options.sql.settings = {'odps.sql.mapper.split.size': 1}

module = args['module']

plan_id = args['plan_id']

calctime = args['calctime']

############輸入表:bid realday tasklist##############################

table_input = 'temp_qbb_btime_parentingtask3_bid_grouplist_tasklist_'+module+'_'+ plan_id+'_'+calctime;

table_result = 'temp_qbb_btime_parentingtask3_plan_bid_itemdata_'+module+'_'+ plan_id+'_'+calctime;##現將前3列直接拷貝temp_qbb_btime_parentingtask3_plan_bid_taskgroup_

sql = "INSERT OVERWRITE TABLE " + table_result + " SELECT * FROM " + table_result + " WHERE null;"

print(sql)

o.execute_sql(sql)

df1 = o.get_table(table_input).to_df()

df2 = o.get_table(table_result).to_df()

DEBUG = False

if DEBUG:

df1 = df1[:100].to_pandas(wrap=True)

df2 = df2[:100].to_pandas(wrap=True)

############遍歷預設推薦任務表:2個引數 planday grouptask #############

table_defitemdata = 'temp_qbb_btime_parentingtask3_plan_itemdata_default_'+module+'_'+ plan_id+'_'+calctime;

defitemdata = o.get_table(table_defitemdata)

reader = defitemdata.open_reader()

length = reader.count

print(length)

itemdata = {}

for record in reader:

itemdata[record[0]] = json.loads(record[1])

print(len(itemdata))

#print(itemdata)

print(itemdata[0])

#for group in list(itemdata[0]):

# td = group['td']

# g = group['g']

# print(td)

# print(g)

###################### 正式段 ##############################

#處理邏輯

#對於每個bid,通過realday在defitemdata中取對應的group和task

#每個bid,通過realday增加60天或者90天的planday

# dictitem = json.loads(table_defitemdata['itemdata'])

#from odps import options

from odps.df import output

#options.verbose = True

#options.sql.settings = {'odps.sql.mapper.split.size': 1}

@output(['bid','itemdata'], ['int64','string'])##planday,[groupid, tasklid]

def ftasklimit(row):

print('ftasklimit ------------')

taskidlistpre = row.tipidlist.split("\002")

outputdatas = []

outdata = {}

print(str(row.bid)+ 'start' )

for i in range(61):

##planday

planday = row.totaldayreal+i

if (planday > 2190):

planday = 2190

outdata['days'] = planday

outgt = []

try:

listgroup = itemdata[planday]

for group in listgroup:

td = group['td']

if td in taskidlistpre :

outgt.append(group)

outdata['gt'] = outgt

outputdatas.append(outdata)

# row['taskidlist']=json.dumps(outputdatas)

except:

pass

if len(outputdatas) >0:

data = json.dumps(outputdatas)

else:

data = None

print(str(row.bid)+ 'end' )

return row.bid ,data

df1.apply(ftasklimit, axis=1).persist(table_result)

#輸出結果給df3.taskid

##df2['itemdata'] = str(df1.apply(ftasklimit, axis=1))

# df2.persist(table_result)