環境搭建參照上一篇hadoop2.7.2 win7基礎環境搭建。

Eclipse hadoop外掛下載2.7.2:http://download.csdn.net/detail/fly_leopard/9503172

將下載的檔案解壓,將jar包放到Eclipse/plugin下,重啟Eclipse。

project檢視多了一個DFS Locations。瀏覽或者下載hadoop上的檔案。下面配置hadoop.

配置Map/Reduce Locations 如下:


此時重新整理DFS Locations如下:代表成功連線到hadoop。

新建hadoop專案,如下: 配置hadoop安裝目錄,建立專案時將會自動引入所需的包。

新建WordCount執行測試,WordCount是從網上找的,自己做了一些小修改加了點註釋。如下:

package com.tom;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class WordCount {
 
    /**
     * 使用者自定義map函式,對以<key, value>為輸入的結果檔案進行處理
     * Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,並重寫其map方法。
     * 通過在map方法中新增兩句把key值和value值輸出到控制檯的程式碼
     * ,可以發現map方法中value值儲存的是文字檔案中的一行(以回車符為行結束標記),而key值為該行的首字母相對於文字檔案的首地址的偏移量。
     * 然後StringTokenizer類將每一行拆分成為一個個的單詞
     * ,並將<word,1>作為map方法的結果輸出,其餘的工作都交有MapReduce框架處理。 每行資料呼叫一次 Tokenizer:單詞分詞器
     */
    public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
 
        /*
         * 重寫Mapper類中的map方法
         */
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            //System.out.println(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());// 獲取下個欄位的值並寫入檔案
                context.write(word, one);
            }
        }
    }
 
    /**
     * 使用者自定義reduce函式,如果有多個熱度測,則每個reduce處理自己對應的map結果資料
     * Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reducer類,並重寫其reduce方法。
     * Map過程輸出<key,values>中key為單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入,
     * 所以reduce方法只要遍歷values並求和,即可得到某個單詞的總次數。
     */
    public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
 
    public static void main(String[] args) throws Exception {
 
        /**
         * 環境變數配置
         */
        File jarFile = EJob.createTempJar("bin");
        ClassLoader classLoader = EJob.getClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);
 
        /**
         * 連線hadoop叢集配置
         */
        Configuration conf = new Configuration(true);
        conf.set("fs.default.name", "hdfs://localhost:9000");
        conf.set("hadoop.job.user", "tom");
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("mapreduce.jobtracker.address", "localhost:50020");
        conf.set("yarn.resourcemanager.hostname", "localhost");
        conf.set("yarn.resourcemanager.admin.address", "localhost:8033");
        conf.set("yarn.resourcemanager.address", "localhost:8032");
        conf.set("yarn.resourcemanager.resource-tracker.address", "localhost:8036");
        conf.set("yarn.resourcemanager.scheduler.address", "localhost:8030");
 
        String[] otherArgs = new String[2];
        otherArgs[0] = "hdfs://localhost:9000/user/tom/input";//計算原檔案目錄,需要提前建立並在裡面存入檔案
        String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        otherArgs[1] = "hdfs://localhost:9000/user/tom/output" + time;//計算後的計算結果儲存目錄,輸出目錄不能相同,每次程式執行的結果目錄不能相同,所以新增時間標籤
 
        /*
         * setJobName()方法命名這個Job。對Job進行合理的命名有助於更快地找到Job,
         * 以便在JobTracker和Tasktracker的頁面中對其進行監視
         */
		Job job = Job.getInstance(conf);
        job.setJobName("word count");
        job.setJarByClass(WordCount.class);
 
        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());//環境變數呼叫,新增此句則可在eclipse中直接提交mapreduce任務,如果將該java檔案打成jar包,需要將該句註釋掉,否則在執行時反而找不到環境變數
 
        // job.setMaxMapAttempts(100);//設定最大試圖產生底map數量,該命令不一定會設定該任務執行過車中的map數量
        // job.setNumReduceTasks(5);//設定reduce數量,即最後生成檔案的數量
 
        /*
         * Job處理的Map(拆分)、Combiner(中間結果合併)以及Reduce(合併)的相關處理類。
         * 這裡用Reduce類來進行Map產生的中間結果合併,避免給網路資料傳輸產生壓力。
         */
        job.setMapperClass(TokenizerMapper.class);// 執行使用者自定義map函式
        job.setCombinerClass(IntSumReducer.class);// 對使用者自定義map函式的資料處理結果進行合併,可以減少頻寬消耗
        job.setReducerClass(IntSumReducer.class);// 執行使用者自定義reduce函式
 
        /*
         * 接著設定Job輸出結果<key,value>的中key和value資料型別,因為結果是<單詞,個數>,
         * 所以key設定為"Text"型別,相當於Java中String型別
         * 。Value設定為"IntWritable",相當於Java中的int型別。
         */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        /*
         * 載入輸入資料夾或檔案路徑,即輸入資料的路徑
         * 將輸入的檔案資料分割成一個個的split,並將這些split分拆成<key,value>對作為後面使用者自定義map函式的輸入
         * 其中,每個split檔案的大小盡量小於hdfs的檔案塊大小
         * (預設64M),否則該split會從其它機器獲取超過hdfs塊大小的剩餘部分資料,這樣就會產生網路頻寬造成計算速度影響
         * 預設使用TextInputFormat型別,即輸入資料形式為文字型別資料檔案
         */
        System.out.println("Job start!");
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
 
        /*
         * 設定輸出檔案路徑 預設使用TextOutputFormat型別,即輸出資料形式為文字型別檔案,欄位間預設以製表符隔開
         */
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
        /*
         * 開始執行上面的設定和演算法
         */
        if (job.waitForCompletion(true)) {
            System.out.println("ok!");
        } else {
            System.out.println("error!");
            System.exit(0);
        }
    }
}

輔助類:負責打jar包的:
package com.tom;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;

/**
 * @author tom
 * 
 */
public class EJob {
 
    //使用者自定義classpath列表
    private static List<URL> classPath = new ArrayList<URL>();
 
    /**
     * 打包成jar檔案
     * @param root class所在根目錄 如:bin
     * @return
     * @throws IOException
     */
    public static File createTempJar(String root) throws IOException {
        if (!new File(root).exists()) {
            return null;
        }
        //jar包清單
        Manifest manifest = new Manifest();
        manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
        //建立jar檔案
        final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir")));
        //執行完畢刪除jar
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                jarFile.delete();
            }
        });
        //向jar檔案內寫入資料
        JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest);
        createTempJarInner(out, new File(root), "");
        out.flush();
        out.close();
        return jarFile;
    }
 
    /**
     * class檔案讀取並寫入到jar檔案,遞迴呼叫
     * @param out 輸出流指向
     * @param f 檔案/目錄,檔案是即將寫入的class
     * @param base f為檔案時,base是檔案相對路徑,f為目錄時,base是目錄
     * @throws IOException
     */
    private static void createTempJarInner(JarOutputStream out, File f,
            String base) throws IOException {
        if (f.isDirectory()) {
            File[] fl = f.listFiles();
            if (base.length() > 0) {
                base = base + "/";
            }
            for (int i = 0; i < fl.length; i++) {
                createTempJarInner(out, fl[i], base + fl[i].getName());
            }
        } else {
        	//放入新的class檔案,讀取並寫入
            out.putNextEntry(new JarEntry(base));
            FileInputStream in = new FileInputStream(f);
            byte[] buffer = new byte[1024];
            int n = in.read(buffer);
            while (n != -1) {
                out.write(buffer, 0, n);
                n = in.read(buffer);
            }
            in.close();
        }
    }
 
    /**
     * 根據URL路徑獲取classloader
     * @return
     */
    public static ClassLoader getClassLoader() {
    	//當前執行緒的classloader
        ClassLoader parent = Thread.currentThread().getContextClassLoader();
        if (parent == null) {
        	//該類classloader
            parent = EJob.class.getClassLoader();
        }
        if (parent == null) {
        	//系統classloader
            parent = ClassLoader.getSystemClassLoader();
        }
        //基於一個classloader來構建一個新的classloader並加上一些classpath
        return new URLClassLoader(classPath.toArray(new URL[0]), parent);
    }
 
    /**
     * 將內容或者目錄新增到classpath
     * @param component
     */
    public static void addClasspath(String component) {
 
        if ((component != null) && (component.length() > 0)) {
            try {
                File f = new File(component);
 
                if (f.exists()) {
					URL key = f.getCanonicalFile().toURI().toURL();
                    if (!classPath.contains(key)) {
                        classPath.add(key);
                    }
                }
            } catch (IOException e) {
            	e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws IOException {
		createTempJar("bin");
	}
}


run as JavaApplication 可以看到namenode黑視窗不斷滾動。等待控制檯執行完畢。重新整理DFS Loactions或者瀏覽器localhost:9000檢視目錄檔案如下:

具體執行流程,參考程式碼上的註釋。