1. 程式人生 > >hadoop工作流引擎解壓jar檔案,並執行出現型別不匹配的情況

hadoop工作流引擎解壓jar檔案,並執行出現型別不匹配的情況

在做hadoop工作流引擎專案開發的時候,編寫某些特定的Mapreduce程式,雖然該程式在hadoop平臺型執行正常,但在工作流引擎中執行卻出現錯誤,主要的原因是該工作流引擎是提取jar中的Mapper和Reducer以及主類,執行該jar時是要重新設定輸入輸出型別,原先設定的是輸入時Text,輸出是IntWritable,所以即使jar程式沒有錯,但放在工作流引擎中執行的時候就會報出異常……所以,需要重新調整程式提取的變數,獲取配置資訊。在這裡可以使用java的reflect的Method或者Field來解決,具體的程式碼如下:

			//從鍵值對中獲取當前結點的Mapper和Reducer
			clss = fileScan.getCls();
			String m = clss.get("mapper");
			String r = clss.get("reducer");
			String ml=clss.get("maincls");

			logger.info(jobName + "'s Map : " + m);
			logger.info(jobName + "'s Reduce : " + r);

			Class mapper = loader.loadClass(m);
			Class reducer = loader.loadClass(r);
			Class maincls=loader.loadClass(ml);
		    Object main=maincls.newInstance();
		    Field combinerField=maincls.getDeclaredField("combinerCls");
		    Field outputKeyField=maincls.getDeclaredField("outputKeyCls");
		    Field outputValueField=maincls.getDeclaredField("outputValueCls");
		    boolean combiner=combinerField.getBoolean(main);
		    Class outputKeyClass=(Class)outputKeyField.get(main);
		    Class outputValueClass=(Class)outputValueField.get(main);
		    logger.info("combinerCls"+combiner);
		    logger.info("outputKeyCls"+outputKeyClass);
		    logger.info("outputValueCls"+outputValueClass);
			
			
			Job job = new Job(conf, jobName);
			job.setJarByClass(maincls);
			job.setMapperClass(mapper);
			if(combiner)
				job.setCombinerClass(reducer);
			job.setReducerClass(reducer);
			job.setOutputKeyClass(outputKeyClass);
			job.setOutputValueClass(outputValueClass);

			if (input.indexOf(";") == -1) {
				FileInputFormat.addInputPath(job, new Path(input));
			} else {
				st = new StringTokenizer(input, ";");
				while (st.hasMoreTokens()) {
					FileInputFormat.addInputPath(job, new Path(st.nextToken()
							.trim()));
				}
			}
			output ="/"+username+"/output/" + jobName;
			FileOutputFormat.setOutputPath(job, new Path(output));
			/**
			 * 此部分用來處理job,將所有的job,根據其流id儲存到Map中,供使用者選擇性終止
			 * */
			//將當前job加入到job的ArrayList jobs中
			jobs.add(job);
			//如果全域性FlowId_Job_List中沒有該flowId,則將flowId作為key,jobs作為值,存入FlowId_Job_List
			if(!GlobalVar.getGlobalVar().getFlowId_Job_List().containsKey(flowId))
				GlobalVar.getGlobalVar().getFlowId_Job_List().put(flowId, jobs);
			else{//	全域性FlowId_Job_List中有flowId,更新該key對應的value
				jobs=GlobalVar.getGlobalVar().getFlowId_Job_List().get(flowId);
			    jobs.add(job);
			    GlobalVar.getGlobalVar().getFlowId_Job_List().remove(flowId);
			    GlobalVar.getGlobalVar().getFlowId_Job_List().put(flowId, jobs);
			}
	       
            //提交作業
			job.submit();