hadoop工作流引擎解壓jar檔案,並執行出現型別不匹配的情況
阿新 • • 發佈:2019-02-19
在做hadoop工作流引擎專案開發的時候,編寫某些特定的Mapreduce程式,雖然該程式在hadoop平臺型執行正常,但在工作流引擎中執行卻出現錯誤,主要的原因是該工作流引擎是提取jar中的Mapper和Reducer以及主類,執行該jar時是要重新設定輸入輸出型別,原先設定的是輸入時Text,輸出是IntWritable,所以即使jar程式沒有錯,但放在工作流引擎中執行的時候就會報出異常……所以,需要重新調整程式提取的變數,獲取配置資訊。在這裡可以使用java的reflect的Method或者Field來解決,具體的程式碼如下:
//從鍵值對中獲取當前結點的Mapper和Reducer clss = fileScan.getCls(); String m = clss.get("mapper"); String r = clss.get("reducer"); String ml=clss.get("maincls"); logger.info(jobName + "'s Map : " + m); logger.info(jobName + "'s Reduce : " + r); Class mapper = loader.loadClass(m); Class reducer = loader.loadClass(r); Class maincls=loader.loadClass(ml); Object main=maincls.newInstance(); Field combinerField=maincls.getDeclaredField("combinerCls"); Field outputKeyField=maincls.getDeclaredField("outputKeyCls"); Field outputValueField=maincls.getDeclaredField("outputValueCls"); boolean combiner=combinerField.getBoolean(main); Class outputKeyClass=(Class)outputKeyField.get(main); Class outputValueClass=(Class)outputValueField.get(main); logger.info("combinerCls"+combiner); logger.info("outputKeyCls"+outputKeyClass); logger.info("outputValueCls"+outputValueClass); Job job = new Job(conf, jobName); job.setJarByClass(maincls); job.setMapperClass(mapper); if(combiner) job.setCombinerClass(reducer); job.setReducerClass(reducer); job.setOutputKeyClass(outputKeyClass); job.setOutputValueClass(outputValueClass); if (input.indexOf(";") == -1) { FileInputFormat.addInputPath(job, new Path(input)); } else { st = new StringTokenizer(input, ";"); while (st.hasMoreTokens()) { FileInputFormat.addInputPath(job, new Path(st.nextToken() .trim())); } } output ="/"+username+"/output/" + jobName; FileOutputFormat.setOutputPath(job, new Path(output)); /** * 此部分用來處理job,將所有的job,根據其流id儲存到Map中,供使用者選擇性終止 * */ //將當前job加入到job的ArrayList jobs中 jobs.add(job); //如果全域性FlowId_Job_List中沒有該flowId,則將flowId作為key,jobs作為值,存入FlowId_Job_List if(!GlobalVar.getGlobalVar().getFlowId_Job_List().containsKey(flowId)) GlobalVar.getGlobalVar().getFlowId_Job_List().put(flowId, jobs); else{// 全域性FlowId_Job_List中有flowId,更新該key對應的value jobs=GlobalVar.getGlobalVar().getFlowId_Job_List().get(flowId); jobs.add(job); GlobalVar.getGlobalVar().getFlowId_Job_List().remove(flowId); GlobalVar.getGlobalVar().getFlowId_Job_List().put(flowId, jobs); } //提交作業 job.submit();