Reduce端join演算法實現 - (訂單跟商品)
程式碼地址:
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/rjon
現在有兩張表 1.訂單表 2.商品表
訂單資料表t_order:
id | date | pid | amount |
---|---|---|---|
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0001 | 3 |
1002 | 20150710 | P0002 | 3 |
1003 | 20150710 | P0003 | 2 |
商品資訊表t_product:
id | pname | category_id | price |
---|---|---|---|
P0001 | 小米5 | 1000 | 2 |
P0002 | 錘子T1 | 1000 | 3 |
P0003 | 華為 | 1001 | 5 |
加入數量巨大,兩張表的資料是以檔案的形式儲存在HDFS中,需要用mapreduce程式來實現SQL查詢運算:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
實現機制:
通過將關聯條件作為map輸出的key,將兩表滿足join條件的資料並攜帶資料所來源的檔案資訊,發往同一個reduce task , 在reduce中進行資料的串聯。
首先有兩個檔案:
訂單檔案
order.txt
1001 20150710 P0001 2
1002 20150710 P0001 3
1002 20150710 P0002 3
1003 20150710 P0003 2
商品資訊檔案:
product.txt
P0001 小米5 1000 2
P0002 錘子T1 1000 3
首先我們應該構建一個實體類,這個實體類裡面要有訂單表和商品表中的所有的欄位。
由於map階段我們是需要讀取兩個檔案的,一個是訂單檔案,一個是商品檔案,但是我們使用的是同一個實體類。所以我們應該有一個屬性用來標識哪個是從訂單中讀取到的資料,哪個是從商品表中讀取到的資料。
因為我們在reduce階段需要將兩張表的資料進行合併,兩張表的關係是一對多的關係,所以我們在設定屬性進行資料封裝的時候,是需要知道哪個是一哪個是多。
所以標識的欄位必須要有。
InfoBean
package com.thp.bigdata.rjon;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* 實體類 使用的是同一個bean
* @author tommy
*
*/
public class InfoBean implements Writable{
// 訂單檔案的屬性
private int order_id; // 訂單id
private String dateString; // 訂單日期
private String pid; // 產品id
private int amount; // 訂單數量
// 商品檔案的屬性
private String pname; // 產品名字
private int category_id; // 產品分類
private float price; // 產品價格
// flag = 0 封裝的是訂單表的資料
// flag = 1 封裝的是產品表的資料
private int flag;
public InfoBean() {}
public void set(int order_id, String dateString, String pid, int amount, String pname, int category_id, float price, int flag) {
this.order_id = order_id;
this.dateString = dateString;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.category_id = category_id;
this.price = price;
this.flag = flag;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public String getDate() {
return dateString;
}
public void setDate(String dateString) {
this.dateString = dateString;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public int getCategory_id() {
return category_id;
}
public void setCategory_id(int category_id) {
this.category_id = category_id;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
@Override
public String toString() {
return "InfoBean [order_id=" + order_id + ", date=" + dateString + ", pid=" + pid + ", amount=" + amount + ", pname="
+ pname + ", category_id=" + category_id + ", price=" + price + "]";
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeUTF(dateString);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeInt(category_id);
out.writeFloat(price);
out.writeInt(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readInt();
this.dateString = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.category_id = in.readInt();
this.price = in.readFloat();
this.flag = in.readInt();
}
}
Map階段:
Map階段要做的事就是從這兩個檔案中將資料拿出來,然後使用實體類 InfoBean進行封裝資料,封裝完資料之後,需要將資料傳送給reduce階段進行處理。由於兩張表中的資料是通過 商品id 進行關聯的。所以context在往外寫資料的時候需要將pid (商品id) 作為key輸出,這樣,reduce階段接收到的bean,都是相同pid的資料。說明是相同商品的資料。
就相當於我們寫SQL中的查詢條件 : on a.pid = b.id
由於我們是從兩個檔案中讀取資料,兩個檔案的資料格式是不一樣的,所以我們應該根據檔名字進行資料的封裝。
封裝時,只對這個檔案中有的屬性進行封裝,沒有的則是喲見你剛預設的代替。
static class RJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean> {
// 不要把建立物件行程式碼放在map方法裡面,因為map方法會不斷地被呼叫,而我們只需要建立物件來進行賦值而已
InfoBean bean = new InfoBean();
Text outKey = new Text(); // 向外輸出的key 必須是聯絡兩張表的那個欄位
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
// 需要獲取檔名字進行區分是哪個表中的資料
// 先獲取檔案的切片
FileSplit fileSplit = (FileSplit) context.getInputSplit();
// 獲取檔名字
String fileName = fileSplit.getPath().getName();
String pid = ""; // 這個pid是非常關鍵的,因為map進行分發資料的時候,我們需要根據這個pid來作為key
if(fileName.startsWith("order")) { // 訂單表中的資料
pid = fields[2];
bean.set(Integer.parseInt(fields[0]), fields[1], pid, Integer.parseInt(fields[3]), "", 0, 0, 0);
} else { // 產品表中的資料
pid = fields[0];
bean.set(0, "", pid, 0, fields[1], Integer.parseInt(fields[2]), Float.parseFloat(fields[3]), 1);
}
outKey.set(pid);
context.write(outKey, bean);
}
}
等封裝了的bean之後,所有的相同的商品id (pid) 都會進入同一個reduce,這樣在reduce階段進行資料的合併。
Reduce 階段:
訂單表時多的一方,商品表時一的一方
需要注意一點的是,我們進行bean的複製的時候,需要使用工具類,或者自己一個一個手動set對應的值,不能直接使用 = 進行賦值,因為都是引用型別。
// reduce 階段現在我們只需要將Bean輸出列印就可以了
static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable> {
@Override
protected void reduce(Text pid, Iterable<InfoBean> beans,Context context) throws IOException, InterruptedException {
// 產品的bean
InfoBean pdBean = new InfoBean();
// 每個產品可能有多個訂單 所以使用ArrayList
ArrayList<InfoBean> orderBeans = new ArrayList<InfoBean>();
for(InfoBean bean : beans) {
if("1".equals(bean.getFlag())) { // 產品檔案中的資料
// 產品檔案中每個產品只有一個
// 注意不能直接使用 pdBean = bean ; 因為都是引用型別
try {
BeanUtils.copyProperties(pdBean, bean);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
} else { // 訂單檔案中的資料
InfoBean odBean = new InfoBean(); // 訂單Bean
try {
BeanUtils.copyProperties(odBean, bean);
orderBeans.add(odBean);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
}
// 將兩張表中的資料進行拼接
for(InfoBean bean: orderBeans) { // 需要將每一個訂單中的資料全部都寫出去
bean.setPname(bean.getPname());
bean.setCategory_id(bean.getCategory_id());
bean.setPrice(bean.getPrice());
context.write(bean, NullWritable.get());
}
}
}
執行緒啟動:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
System.err.println(args[0]);
System.err.println(args[1]);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage : wordcount <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJar("f:/rjoin.jar");
// 指定本業務job要使用的mapper/Reduce業務類
job.setMapperClass(RJoinMapper.class);
job.setReducerClass(RJoinReducer.class);
// 指定mapper輸出資料的kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
// 指定最終輸出的資料的kv型別
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);
Path path = new Path(otherArgs[1]);
FileSystem fileSystem = path.getFileSystem(conf); // 根據path找到這個檔案
if(fileSystem.exists(path)) {
fileSystem.delete(path, true); // true的意思是,就算output有東西,也一帶刪除
}
// 指定job的輸入原始檔案所在的目錄
// 待處理檔案可以在多個目錄裡面
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job的輸出結果
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 將job中配置的相關引數,以及job所用的的java類所在的jar包,提交給yarn去執行
/*job.submit();*/
boolean res = job.waitForCompletion(true); // 會等待程式處理完成之後,程式才退出
System.exit(res ? 0 : 1);
}
這個是可以直接執行在hadoop叢集上的。因為我之前的專案複製了hadoop的一些配置檔案。