1. 程式人生 > >使用者定義的java計數器

使用者定義的java計數器

mapreduce 計數器用來做某個資訊的統計。

 

計數器是全域性的。mapreduce 框架將跨所有map和reduce聚集這些計數器,並且作業結束時產生一個最終的結果。

語法像 java 的 enum 型別。

需求: 統計某個目錄下,各個檔案一共出現的行數,和出現單詞的總數。

 

思路: 定義一個計數器。 

package com.mapreduce.count;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class CountDerived { // 定義一個全域性的計數器,每個map,reduce都可以訪問到 enum COUNT{ LINES_COUNT, WORDS_COUNT } public static void main(String[] args) throws Exception { // 1 獲取configuration Configuration configuration = new Configuration();
// 2 job Job job = Job.getInstance(configuration); // 3 作業jar包 job.setJarByClass(CountDerived.class); // 4 map, reduce jar 包 job.setMapperClass(CounterMap.class); // 5 map 輸出型別 job.setMapOutputKeyClass(Text.
class); job.setMapOutputValueClass(IntWritable.class); // 6 最終 輸出型別 (reducer) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 7 inputformatclass , outputformatclass 輸入輸出入檔案型別 可能決定分片資訊 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 8 輸入輸出檔案路徑 FileInputFormat.setInputPaths(job, new Path("d:/input")); FileOutputFormat.setOutputPath(job, new Path("d:/output1")); // 9 job提交 job.waitForCompletion(true); } }

 

package com.mapreduce.count;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.Mapper;
/*
 *  job 那邊定義 全域性計數器  count { lineCount, wordsCount }
 */

import com.mapreduce.count.CountDerived.COUNT;


public class CounterMap extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    Text k = new Text();
    IntWritable v = new IntWritable();
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        // 動態獲取計數器
        Counter line_counter =  (Counter) context.getCounter(COUNT.LINES_COUNT);
        //將計數器 + 1
        line_counter.increment(1);
        
        String line = value.toString();
        String[] words = line.split(" ");
        v.set(1);
        for(String w:words){
            // 同理
            context.getCounter(COUNT.WORDS_COUNT).increment(1);
            k.set(w);
            context.write(k, v);
        }
    }

}