1. 程式人生 > >MapReduce實現sql的查詢功能

MapReduce實現sql的查詢功能

查詢員工表中部門所有人的工資總和

SQL 語句為:select  deptno , sum(sal)  from  emp group by  deptno   order   by deptno;

下面用MapReduce程式來實現對該SQL語句相同的查詢功能

Mapper程式如下:

package SalaryTotal;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

	@Override
	protected void map(LongWritable key1, Text value1,Context context)
			throws IOException, InterruptedException {
		
		/*
		 * context  表示Mapper的上下文
		 * 上文:HDFS
		 * 下文:Mapper
		 */
		//資料:7900,JAMES,CLERK,7698,1981/12/3,950,,30
		String data = value1.toString();
		
		//分詞
		String[] words = data.split(",");
		
		//輸出   k2部門號      v2薪水
		for(String w:words) {
			context.write(new IntWritable(Integer.parseInt(words[7])), new IntWritable(Integer.parseInt(words[5])));
		}
	}
	
}

Reducer程式如下:

package SalaryTotal;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class SalaryTotalReucer extends Reducer<IntWritable, IntWritable,IntWritable ,IntWritable> {

	@Override
	protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context)
			throws IOException, InterruptedException {
		
		//對v3求和,得到該部門的工資總額
		int total = 0;
		for(IntWritable v:v3) {
			total += v.get();
		}
		
		//輸出
		context.write(k3, new IntWritable(total));
	}
	
}

主程式如下:

package SalaryTotal;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SalaryTotalMain {
	public static void main(String[] args) throws Exception {
		//建立一個job
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(SalaryTotalMain.class);
		
		//指定job的mapper和輸出的型別  k2   v2
		job.setMapperClass(SalaryTotalMapper.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//指定job的reducer和輸出的型別   k4 v4
		job.setReducerClass(SalaryTotalReucer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		
		//指定job的輸入和輸出的路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//執行任務
		job.waitForCompletion(true);
	}
}

在Hadoop上執行此MapReduce程式

hadoop jar salary.jar /scott/emp.csv /output/0814/salary


檢視執行的結果

由此結果可以看出,此MapReduce程式和SQL 語句的執行結果一樣,即MapReduce程式正確。