1. 程式人生 > >map端join

map端join

path auth not config 單表 mapreduce == 書包 task

package my.hadoop.hdfs.mapreduceJoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 當商品表比較小只有幾十個(比如小米手機),但是訂單表比較大(一年賣幾千萬)此時 * 如果將每個產品用一個reduce處理時那就可能出現小米書包只有幾萬,數據,但是小米手機就有100萬的數據, * 出現負載不均衡,數據傾斜的情況。 * @author lq * */ public class MapsideJoin {
public static class FindFriendMapper extends Mapper<LongWritable, Text, AllInfoBean, NullWritable> { FileSplit fileSplit = null; String filename = null; Map<String,String> pdinfo = new HashMap<String,String>(); @Override protected void setup( Mapper<LongWritable, Text, AllInfoBean, NullWritable>.Context context) throws IOException, InterruptedException { //文件和程序已經在同一個路徑(splist。xml。wc,) BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product"))); String line = null; while ((line = br.readLine())!=null){ String[] split = line.split(","); pdinfo.put(split[0], split[1]); } // 關閉流 br.close(); } AllInfoBean bean = new AllInfoBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 獲取文件名字的方法 // 判斷用的是哪個文件 String[] cols = value.toString().split(","); bean.setOderid(Integer.parseInt(cols[0])); bean.setDate(cols[1]); bean.setPid(cols[2]); bean.setAmount(Integer.parseInt(cols[3])); bean.setPname(pdinfo.get(cols[2])==null? "" : pdinfo.get(cols[2])); bean.setPrice(""); bean.setCategory_id(""); context.write(bean, NullWritable.get()); } } //不要reduce /*public static class FindFriendReducer extends Reducer<Text, AllInfoBean, AllInfoBean, NullWritable> { @Override protected void reduce(Text Keyin, Iterable<AllInfoBean> values, Context context) throws IOException, InterruptedException { for(AllInfoBean bean : values){ context.write(bean, NullWritable.get()); } } }*/ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(MapsideJoin.class); job.setMapperClass(FindFriendMapper.class); //不指定reduce //job.setReducerClass(FindFriendReducer.class); //指定最終輸出的數據kv類型 //job.setMapOutputKeyClass(Text.class); //job.setMapOutputValueClass(AllInfoBean.class); job.setNumReduceTasks(0);//設置不運行reduce job.setOutputKeyClass(AllInfoBean.class); job.setOutputValueClass(NullWritable.class); //第三方jar包使用這個路徑指定,本地和hdfs都可以 //job.addArchiveToClassPath(archive); //job job.addCacheFile(new URI("hdfs://mini2:9000/Rjoin/dat2/product"));//緩存其他節點 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 :1); } }

map端join