1. 程式人生 > >MapReduce之join演算法案例實現

MapReduce之join演算法案例實現

1、需求:

訂單資料表t_order

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

商品資訊表t_product

id

pname

category_id

price

P0001

小米5

1000

2

P0002

錘子T1

1000

3

假如資料量巨大,兩表的資料是以檔案的形式儲存在HDFS中,需要用mapreduce程式來實現一下SQL查詢運算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

2、實現機制:

通過將關聯的條件作為map輸出的key,將兩表滿足join條件的資料並攜帶資料所來源的檔案資訊,發往同一個reducetask,在reduce中進行資料的串聯

package cn.itcast.bigdata.mr.rjoin;


import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;


import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import cn.itcast.bigdata.mr.flowsum.FlowBean;
import cn.itcast.bigdata.mr.flowsum.FlowCount;


public class Rjoin {


static class RjoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{

InfoBean bean = new InfoBean();
Text text = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

//獲取每一行的文字資料
String line = value.toString();
//獲取每一行的切片資料
FileSplit inputsplit = (FileSplit) context.getInputSplit();
//獲取該行切片的名稱
String name = inputsplit.getPath().getName();
String pid = "";
//通過檔案判斷是哪種資料型別
if(name.startsWith("order")) {
//將資料進行切分
String[] fileds = line.split(",");
pid = fileds[2];
bean.set(Integer.parseInt(fileds[0]), fileds[1], fileds[2], Integer.parseInt(fileds[3]), "", 0, 0, "0");
}else {
    String[] fileds = line.split(",");
    pid = fileds[0];
    bean.set(0, "", fileds[0], 0, fileds[1], Integer.parseInt(fileds[2]), Float.parseFloat(fileds[3]), "1");
}

text.set(pid);
//map彙總寫出去的資料是以pid為key,InfoBean為value的方式寫出去給reduce進行處理
context.write(text, bean);
}
}


static class RjoinReduce extends  Reducer<Text, InfoBean, InfoBean, NullWritable>{

@Override
protected void reduce(Text key, Iterable<InfoBean> values,Context context) throws IOException, InterruptedException {
InfoBean pdBean = new InfoBean();
List<InfoBean> orderBeans = new ArrayList();

for(InfoBean bean : values) {
if("1".equals(bean.getFlag())) { //產品資訊的bean
try {

BeanUtils.copyProperties(pdBean, bean);

                                               orderBeans.add(odBean);

} catch (Exception e) {
e.printStackTrace();

}else {
InfoBean odBean = new InfoBean();  //訂單bean
try {
BeanUtils.copyProperties(odBean, bean);
} catch (Exception e) {
e.printStackTrace();

}
}

//拼接兩類資料最終形成的bean
for(InfoBean bean : orderBeans) {
bean.setPname(pdBean.getPname());
bean.setCategory_id(pdBean.getCategory_id());
bean.setPrice(pdBean.getPrice());

context.write(bean, NullWritable.get());
}
}
}



public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        
        //指定本程式jar包所在的本地路徑
        job.setJarByClass(Rjoin.class);
        
      //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(RjoinMapper.class);
        job.setReducerClass(RjoinReduce.class);
        
      //指定mapper輸出資料的kv型別
        job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(InfoBean.class);
        
      //指定最終輸出的資料的kv型別
        job.setOutputKeyClass(InfoBean.class);
        job.setOutputValueClass(NullWritable.class);
        
      //指定job的輸入原始檔案所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
      //指定job的輸出結果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
      //將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
        boolean  b = job.waitForCompletion(true);
        System.exit(b?0:1);


}


}

package cn.itcast.bigdata.mr.rjoin;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.Writable;


public class InfoBean implements Writable{
    
private int id;
private String date;
private String pid;
private int amount;
private String pname;
private int category_id;
private float price;
private String flag;


//flag為0代表訂單資料表,為1代表商品資訊表
public String getFlag() {
return flag;
}


public void setFlag(String flag) {
this.flag = flag;
}


public InfoBean() {}


  


public void set(int id, String date, String pid, int amount, String pname, int category_id, float price,String flag) {
this.id = id;
this.date = date;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.category_id = category_id;
this.price = price;
this.flag = flag;
}


public int getId() {
return id;
}


public void setId(int id) {
this.id = id;
}


public String getDate() {
return date;
}


public void setDate(String date) {
this.date = date;
}


public String getPid() {
return pid;
}


public void setPid(String pid) {
this.pid = pid;
}


public int getAmount() {
return amount;
}


public void setAmount(int amount) {
this.amount = amount;
}


public String getPname() {
return pname;
}


public void setPname(String pname) {
this.pname = pname;
}


public int getCategory_id() {
return category_id;
}


public void setCategory_id(int category_id) {
this.category_id = category_id;
}


  
public float getPrice() {
return price;
}


public void setPrice(float price) {
this.price = price;
}


//序列化方法,將物件以轉化為流的方式寫出去
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(date);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeInt(category_id);
out.writeFloat(price);
};


//反序列化方法,將傳過來的流轉化為我們需要的物件
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.date = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.category_id = in.readInt();
this.price = in.readFloat();
}


@Override
public String toString() {
return "id=" + id + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" + pname
+ ", category_id=" + category_id + ", price=" + price + "";
}




}