1. 程式人生 > >hadoop 把mapreduce任務從本地提交到hadoop集群上運行

hadoop 把mapreduce任務從本地提交到hadoop集群上運行

bsp ole 不能 href print 運行方式 examples jar iss

MapReduce任務有三種運行方式:

1、windows(linux)本地調試運行,需要本地hadoop環境支持

2、本地編譯成jar包,手動發送到hadoop集群上用hadoop jar或者yarn jar方式運行。

3、本地編譯環境在IDE裏直接提交到集群上運行,實際上這種方式就是第二種方式的變種。

本例說的就是第三種方式

1)核心的部分就是Confirguration的配置
2)本地需要編譯成jar包
3)運行參數在本地配置,包括輸入輸出參數
4)出現windows下的環境配置問題,參照https://www.cnblogs.com/asker009/p/10348188.html

技術分享圖片

關鍵運行代碼如下:mapper和reducer就不貼出來了,可以看之前的https://www.cnblogs.com/asker009/p/10337598.html

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; public class WordCount { private static String HDFSUri = "hdfs://bigdata-senior01.home.com:9000"; public static void main(String[] args) throws Exception { if(args.length!=2) { System.err.println(
"使用格式:WordCount <input path> <output path>"); System.exit(-1); } long startTime = System.currentTimeMillis(); //Configuration類代表作業的配置,該類會加載mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。 Configuration conf =new Configuration(); //本地模式運行mr程序時,輸入輸出的數據可以在本地,也可以在hdfs上 //到底在哪裏,就看以下兩行配置你用哪行,默認就是file:/// conf.set("fs.defaultFS","hdfs://bigdata-senior01.home.com:9000"); // conf.set("fs.defaultFS", "file:///"); //本地提交到集群上運行 //運行集群模式,就是把程序提交到yarn中去運行 //要想運行為集群模式,以下5個參數要指定為集群上的值(實際上就是hadoop集群上的配置) //還需要把hadoop集群上core-site.xml,yarn-site.xml,mapred-site.xml拷貝到resources目錄下或者把這幾個文件的核心配置寫入conf變量 //如果是把程序打包成jar,hadoop jar運行,不需要寫下面,因為hadoop jar腳本自動把集群中配置好的配置文件加載給該程序 conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.nodemanager.aux-services","mapreduce_shuffle"); conf.set("yarn.resourcemanager.hostname", "bigdata-senior01.home.com"); conf.set("hadoop.tmp.dir","/opt/data/tmp"); conf.set("mapreduce.application.classpath","/opt/modules/hadoop-3.1.0/share/hadoop/mapreduce/*, /opt/modules/hadoop-3.1.0/share/hadoop/mapreduce/lib-examples/*"); //跨平臺提交 conf.set("mapreduce.app-submission.cross-platform", "true"); //設置mapred.jar的路徑,不然會報找不到,設置的內容就是本例中輸出的jar包 conf.set("mapred.jar","E:\\myProgram\\Java\\wordcount\\out\\artifacts\\wordcount_jar\\wordcount.jar"); //如果實在非hadoop用戶環境下提交任務 System.setProperty("HADOOP_USER_NAME","hadoop"); System.out.println("HADOOP_USER_NAME: "+System.getProperty("HADOOP_USER_NAME")); Path outPath = new Path(args[1]); //FileSystem裏面包括很多系統,不局限於hdfs FileSystem fileSystem = FileSystem.get(URI.create(HDFSUri),conf); //刪除輸出路徑 if(fileSystem.exists(outPath)) { fileSystem.delete(outPath,true); } Job job = Job.getInstance(conf,"word count"); // new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); //Combiner最終不能影響reduce輸出的結果 // job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); //一般情況下mapper和reducer的輸出的數據類型是一樣的,所以我們用上面兩條命令就行,如果不一樣,我們就可以用下面兩條命令單獨指定mapper的輸出key、value的數據類型 //job.setMapOutputKeyClass(Text.class); //job.setMapOutputValueClass(IntWritable.class); //hadoop默認的是TextInputFormat和TextOutputFormat,所以說我們這裏可以不用配置。 // job.setInputFormatClass(TextInputFormat.class); // job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定的這個路徑可以是單個文件、一個目錄或符合特定文件模式的一系列文件。 //從方法名稱可以看出,可以通過多次調用這個方法來實現多路徑的輸入。 FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean result = job.waitForCompletion(true); long endTime = System.currentTimeMillis(); long timeSpan = endTime - startTime; System.out.println("運行耗時:"+timeSpan+"毫秒。"); System.exit( result ? 0 : 1); } }

hadoop 把mapreduce任務從本地提交到hadoop集群上運行