hadoop2.7.2 win7 eclipse環境搭建測試
阿新 • • 發佈:2019-02-12
環境搭建參照上一篇hadoop2.7.2 win7基礎環境搭建。
Eclipse hadoop外掛下載2.7.2:http://download.csdn.net/detail/fly_leopard/9503172
將下載的檔案解壓,將jar包放到Eclipse/plugin下,重啟Eclipse。
project檢視多了一個DFS Locations。瀏覽或者下載hadoop上的檔案。下面配置hadoop.
配置Map/Reduce Locations 如下:
此時重新整理DFS Locations如下:代表成功連線到hadoop。
新建hadoop專案,如下: 配置hadoop安裝目錄,建立專案時將會自動引入所需的包。
新建WordCount執行測試,WordCount是從網上找的,自己做了一些小修改加了點註釋。如下:
package com.tom; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { /** * 使用者自定義map函式,對以<key, value>為輸入的結果檔案進行處理 * Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,並重寫其map方法。 * 通過在map方法中新增兩句把key值和value值輸出到控制檯的程式碼 * ,可以發現map方法中value值儲存的是文字檔案中的一行(以回車符為行結束標記),而key值為該行的首字母相對於文字檔案的首地址的偏移量。 * 然後StringTokenizer類將每一行拆分成為一個個的單詞 * ,並將<word,1>作為map方法的結果輸出,其餘的工作都交有MapReduce框架處理。 每行資料呼叫一次 Tokenizer:單詞分詞器 */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /* * 重寫Mapper類中的map方法 */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); //System.out.println(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken());// 獲取下個欄位的值並寫入檔案 context.write(word, one); } } } /** * 使用者自定義reduce函式,如果有多個熱度測,則每個reduce處理自己對應的map結果資料 * Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reducer類,並重寫其reduce方法。 * Map過程輸出<key,values>中key為單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入, * 所以reduce方法只要遍歷values並求和,即可得到某個單詞的總次數。 */ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { /** * 環境變數配置 */ File jarFile = EJob.createTempJar("bin"); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); /** * 連線hadoop叢集配置 */ Configuration conf = new Configuration(true); conf.set("fs.default.name", "hdfs://localhost:9000"); conf.set("hadoop.job.user", "tom"); conf.set("mapreduce.framework.name", "yarn"); conf.set("mapreduce.jobtracker.address", "localhost:50020"); conf.set("yarn.resourcemanager.hostname", "localhost"); conf.set("yarn.resourcemanager.admin.address", "localhost:8033"); conf.set("yarn.resourcemanager.address", "localhost:8032"); conf.set("yarn.resourcemanager.resource-tracker.address", "localhost:8036"); conf.set("yarn.resourcemanager.scheduler.address", "localhost:8030"); String[] otherArgs = new String[2]; otherArgs[0] = "hdfs://localhost:9000/user/tom/input";//計算原檔案目錄,需要提前建立並在裡面存入檔案 String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); otherArgs[1] = "hdfs://localhost:9000/user/tom/output" + time;//計算後的計算結果儲存目錄,輸出目錄不能相同,每次程式執行的結果目錄不能相同,所以新增時間標籤 /* * setJobName()方法命名這個Job。對Job進行合理的命名有助於更快地找到Job, * 以便在JobTracker和Tasktracker的頁面中對其進行監視 */ Job job = Job.getInstance(conf); job.setJobName("word count"); job.setJarByClass(WordCount.class); ((JobConf) job.getConfiguration()).setJar(jarFile.toString());//環境變數呼叫,新增此句則可在eclipse中直接提交mapreduce任務,如果將該java檔案打成jar包,需要將該句註釋掉,否則在執行時反而找不到環境變數 // job.setMaxMapAttempts(100);//設定最大試圖產生底map數量,該命令不一定會設定該任務執行過車中的map數量 // job.setNumReduceTasks(5);//設定reduce數量,即最後生成檔案的數量 /* * Job處理的Map(拆分)、Combiner(中間結果合併)以及Reduce(合併)的相關處理類。 * 這裡用Reduce類來進行Map產生的中間結果合併,避免給網路資料傳輸產生壓力。 */ job.setMapperClass(TokenizerMapper.class);// 執行使用者自定義map函式 job.setCombinerClass(IntSumReducer.class);// 對使用者自定義map函式的資料處理結果進行合併,可以減少頻寬消耗 job.setReducerClass(IntSumReducer.class);// 執行使用者自定義reduce函式 /* * 接著設定Job輸出結果<key,value>的中key和value資料型別,因為結果是<單詞,個數>, * 所以key設定為"Text"型別,相當於Java中String型別 * 。Value設定為"IntWritable",相當於Java中的int型別。 */ job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /* * 載入輸入資料夾或檔案路徑,即輸入資料的路徑 * 將輸入的檔案資料分割成一個個的split,並將這些split分拆成<key,value>對作為後面使用者自定義map函式的輸入 * 其中,每個split檔案的大小盡量小於hdfs的檔案塊大小 * (預設64M),否則該split會從其它機器獲取超過hdfs塊大小的剩餘部分資料,這樣就會產生網路頻寬造成計算速度影響 * 預設使用TextInputFormat型別,即輸入資料形式為文字型別資料檔案 */ System.out.println("Job start!"); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); /* * 設定輸出檔案路徑 預設使用TextOutputFormat型別,即輸出資料形式為文字型別檔案,欄位間預設以製表符隔開 */ FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); /* * 開始執行上面的設定和演算法 */ if (job.waitForCompletion(true)) { System.out.println("ok!"); } else { System.out.println("error!"); System.exit(0); } } }
輔助類:負責打jar包的:
package com.tom; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.List; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; /** * @author tom * */ public class EJob { //使用者自定義classpath列表 private static List<URL> classPath = new ArrayList<URL>(); /** * 打包成jar檔案 * @param root class所在根目錄 如:bin * @return * @throws IOException */ public static File createTempJar(String root) throws IOException { if (!new File(root).exists()) { return null; } //jar包清單 Manifest manifest = new Manifest(); manifest.getMainAttributes().putValue("Manifest-Version", "1.0"); //建立jar檔案 final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir"))); //執行完畢刪除jar Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { jarFile.delete(); } }); //向jar檔案內寫入資料 JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest); createTempJarInner(out, new File(root), ""); out.flush(); out.close(); return jarFile; } /** * class檔案讀取並寫入到jar檔案,遞迴呼叫 * @param out 輸出流指向 * @param f 檔案/目錄,檔案是即將寫入的class * @param base f為檔案時,base是檔案相對路徑,f為目錄時,base是目錄 * @throws IOException */ private static void createTempJarInner(JarOutputStream out, File f, String base) throws IOException { if (f.isDirectory()) { File[] fl = f.listFiles(); if (base.length() > 0) { base = base + "/"; } for (int i = 0; i < fl.length; i++) { createTempJarInner(out, fl[i], base + fl[i].getName()); } } else { //放入新的class檔案,讀取並寫入 out.putNextEntry(new JarEntry(base)); FileInputStream in = new FileInputStream(f); byte[] buffer = new byte[1024]; int n = in.read(buffer); while (n != -1) { out.write(buffer, 0, n); n = in.read(buffer); } in.close(); } } /** * 根據URL路徑獲取classloader * @return */ public static ClassLoader getClassLoader() { //當前執行緒的classloader ClassLoader parent = Thread.currentThread().getContextClassLoader(); if (parent == null) { //該類classloader parent = EJob.class.getClassLoader(); } if (parent == null) { //系統classloader parent = ClassLoader.getSystemClassLoader(); } //基於一個classloader來構建一個新的classloader並加上一些classpath return new URLClassLoader(classPath.toArray(new URL[0]), parent); } /** * 將內容或者目錄新增到classpath * @param component */ public static void addClasspath(String component) { if ((component != null) && (component.length() > 0)) { try { File f = new File(component); if (f.exists()) { URL key = f.getCanonicalFile().toURI().toURL(); if (!classPath.contains(key)) { classPath.add(key); } } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { createTempJar("bin"); } }
run as JavaApplication 可以看到namenode黑視窗不斷滾動。等待控制檯執行完畢。重新整理DFS Loactions或者瀏覽器localhost:9000檢視目錄檔案如下:
具體執行流程,參考程式碼上的註釋。