1. 程式人生 > >大資料(hadoop-mapreduce程式設計應用)

大資料(hadoop-mapreduce程式設計應用)

package demo;
import  java.io.*;
import org.apache.hadoop.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;



public class dataRedunplication {
		public static class Map extends Mapper<Object,Text,Text,Text>{
			private static Text line = new Text();
			public void map(Object key, Text value, Context context)throws IOException,InterruptedException
			{
				line = value;
				context.write(line, new Text(""));
			}
		}
		
		public static class Reduce extends Reducer<Text,Text,Text,Text>
		{
			public void reduce(Text key, Iterable <Text>values, Context context) throws IOException, InterruptedException
			{
				context.write(key, new Text(""));
			}
		}
		public static void main(String[] args) throws Exception
		{
			Configuration conf = new Configuration();
			conf.set("mapred.job.tracker","192.168.1.2:9001");
			String[] ioArgs = new String[]{"input","output"};
			String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
			if(otherArgs.length!= 2)
			{
				System.err.println("Usage:Data redunplication <in><out>");
				System.exit(2);
			}
			Job job = new Job(conf,"Data Redunplication");
			job.setJarByClass(dataRedunplication.class);
			job.setMapperClass(Map.class);
			job.setReducerClass(Reduce.class);
			//job.setCombinerClass(Reduce.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			 
			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
			System.exit(job.waitForCompletion(true)?0:1);
		
		}
}

 

package demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import demo.dataRedunplication.Map;
import demo.dataRedunplication.Reduce;

public class hufen {
	public static class Map extends Mapper<LongWritable,Text,Text,NullWritable>{
		Text keyout =new Text();
		Text valueout =new Text();
		
		public void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException
		{
			 String[] rdLine = value.toString().trim().split(":");
			 String username = rdLine[0];
			 String[] fans = rdLine[1].split(",");
			 for(int i =0;i<fans.length;i++)
			 { 
				 String fansname = fans[i];
				 String hufenzu = "";
				 if(username.compareTo(fansname)<0)
				 {
					hufenzu  = username+"-"+fansname;//  a-b
				 }
				 else
				 {
					 hufenzu = fansname+"-"+username;// b-a=> a-b
				 }
				 keyout.set(hufenzu);
				 context.write(keyout,NullWritable.get());
			 }
			 
		}
		public static class Reduce extends Reducer<Text,NullWritable,Text,NullWritable>
		{
			Text keyout =new Text();
			Text valueout =new Text();
			public void reduce(Text key, Iterable <NullWritable>values, Context context) throws IOException, InterruptedException
			{
				int count = 0;
				for(NullWritable text : values )
				{
					count++;
				}
				if(count == 2)
				{
					context.write(key, NullWritable.get());
				}
				
				
			}
		}
		public static void main(String[] args) throws Exception
		{
			Configuration conf = new Configuration();
			conf.set("mapred.job.tracker","192.168.1.2:9001");
			String[] ioArgs = new String[]{"input2","output2"};
			String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
			Job job = new Job(conf,"hufen");
			job.setJarByClass(hufen.class);
			job.setMapperClass(Map.class);
			job.setReducerClass(Reduce.class);
			//job.setCombinerClass(Reduce.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(NullWritable.class);
			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
			System.exit(job.waitForCompletion(true)?0:1);
		}
	}
}