1. 程式人生 > >Reduce端join演算法實現 - (訂單跟商品)

Reduce端join演算法實現 - (訂單跟商品)

程式碼地址:
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/rjon

現在有兩張表 1.訂單表 2.商品表
訂單資料表t_order:

id date pid amount
1001 20150710 P0001 2
1002 20150710 P0001 3
1002 20150710 P0002 3
1003 20150710 P0003 2

商品資訊表t_product:

id pname category_id price
P0001 小米5 1000 2
P0002 錘子T1 1000 3
P0003 華為 1001 5

加入數量巨大,兩張表的資料是以檔案的形式儲存在HDFS中,需要用mapreduce程式來實現SQL查詢運算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

實現機制:

通過將關聯條件作為map輸出的key,將兩表滿足join條件的資料並攜帶資料所來源的檔案資訊,發往同一個reduce task , 在reduce中進行資料的串聯。
首先有兩個檔案:
訂單檔案
order.txt

1001	20150710	P0001	2
1002	20150710	P0001	3
1002	20150710	P0002	3
1003    20150710    P0003   2

商品資訊檔案:
product.txt

P0001	小米5	1000	2
P0002	錘子T1	1000	3

首先我們應該構建一個實體類,這個實體類裡面要有訂單表和商品表中的所有的欄位。
由於map階段我們是需要讀取兩個檔案的,一個是訂單檔案,一個是商品檔案,但是我們使用的是同一個實體類。所以我們應該有一個屬性用來標識哪個是從訂單中讀取到的資料,哪個是從商品表中讀取到的資料。

因為我們在reduce階段需要將兩張表的資料進行合併,兩張表的關係是一對多的關係,所以我們在設定屬性進行資料封裝的時候,是需要知道哪個是一哪個是多。
所以標識的欄位必須要有。

InfoBean

package com.thp.bigdata.rjon;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 實體類 使用的是同一個bean
 * @author tommy
 *
 */
public class InfoBean implements Writable{

	// 訂單檔案的屬性
	private int order_id;  // 訂單id
	private String dateString;     // 訂單日期
	private String pid;	   // 產品id
	private int amount;	   // 訂單數量
	
	
	// 商品檔案的屬性
	private String pname;  // 產品名字
	private int category_id; // 產品分類
	private float price;    // 產品價格
	
	// flag = 0  封裝的是訂單表的資料
	// flag = 1  封裝的是產品表的資料
	private int flag;
	
	
	
	public InfoBean() {}
	
	public void set(int order_id, String dateString, String pid, int amount, String pname, int category_id, float price, int flag) {
		this.order_id = order_id;
		this.dateString = dateString;
		this.pid = pid;
		this.amount = amount;
		this.pname = pname;
		this.category_id = category_id;
		this.price = price;
		this.flag = flag;
	}

	public int getOrder_id() {
		return order_id;
	}

	public void setOrder_id(int order_id) {
		this.order_id = order_id;
	}

	public String getDate() {
		return dateString;
	}

	public void setDate(String dateString) {
		this.dateString = dateString;
	}

	public String getPid() {
		return pid;
	}

	public void setPid(String pid) {
		this.pid = pid;
	}

	public int getAmount() {
		return amount;
	}

	public void setAmount(int amount) {
		this.amount = amount;
	}

	public String getPname() {
		return pname;
	}

	public void setPname(String pname) {
		this.pname = pname;
	}

	public int getCategory_id() {
		return category_id;
	}

	public void setCategory_id(int category_id) {
		this.category_id = category_id;
	}

	public float getPrice() {
		return price;
	}

	public void setPrice(float price) {
		this.price = price;
	}
	
	public int getFlag() {
		return flag;
	}

	public void setFlag(int flag) {
		this.flag = flag;
	}

	@Override
	public String toString() {
		return "InfoBean [order_id=" + order_id + ", date=" + dateString + ", pid=" + pid + ", amount=" + amount + ", pname="
				+ pname + ", category_id=" + category_id + ", price=" + price + "]";
	}
	
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(order_id);
		out.writeUTF(dateString);
		out.writeUTF(pid);
		out.writeInt(amount);
		out.writeUTF(pname);
		out.writeInt(category_id);
		out.writeFloat(price);
		out.writeInt(flag);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.order_id = in.readInt();
		this.dateString = in.readUTF();
		this.pid = in.readUTF();
		this.amount = in.readInt();
		this.pname = in.readUTF();
		this.category_id = in.readInt();
		this.price = in.readFloat();
		this.flag = in.readInt();
	}

}

Map階段:

Map階段要做的事就是從這兩個檔案中將資料拿出來,然後使用實體類 InfoBean進行封裝資料,封裝完資料之後,需要將資料傳送給reduce階段進行處理。由於兩張表中的資料是通過 商品id 進行關聯的。所以context在往外寫資料的時候需要將pid (商品id) 作為key輸出,這樣,reduce階段接收到的bean,都是相同pid的資料。說明是相同商品的資料。
就相當於我們寫SQL中的查詢條件 : on a.pid = b.id

由於我們是從兩個檔案中讀取資料,兩個檔案的資料格式是不一樣的,所以我們應該根據檔名字進行資料的封裝。
封裝時,只對這個檔案中有的屬性進行封裝,沒有的則是喲見你剛預設的代替。

   static class RJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean> {
		
		// 不要把建立物件行程式碼放在map方法裡面,因為map方法會不斷地被呼叫,而我們只需要建立物件來進行賦值而已
		InfoBean bean = new InfoBean();
		Text outKey = new Text();  // 向外輸出的key 必須是聯絡兩張表的那個欄位
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split(",");
			
			
			// 需要獲取檔名字進行區分是哪個表中的資料
			// 先獲取檔案的切片
			FileSplit fileSplit = (FileSplit) context.getInputSplit();
			// 獲取檔名字
			String fileName = fileSplit.getPath().getName();
			
			String pid = ""; // 這個pid是非常關鍵的,因為map進行分發資料的時候,我們需要根據這個pid來作為key
			if(fileName.startsWith("order")) { // 訂單表中的資料
				pid = fields[2];
			    bean.set(Integer.parseInt(fields[0]), fields[1], pid, Integer.parseInt(fields[3]), "", 0, 0, 0);
			} else { // 產品表中的資料
				pid = fields[0];
				bean.set(0, "", pid, 0, fields[1], Integer.parseInt(fields[2]), Float.parseFloat(fields[3]), 1);
			}
			outKey.set(pid);
			context.write(outKey, bean);
		}
	}
	

等封裝了的bean之後,所有的相同的商品id (pid) 都會進入同一個reduce,這樣在reduce階段進行資料的合併。

Reduce 階段:

訂單表時多的一方,商品表時一的一方

需要注意一點的是,我們進行bean的複製的時候,需要使用工具類,或者自己一個一個手動set對應的值,不能直接使用 = 進行賦值,因為都是引用型別。

   // reduce 階段現在我們只需要將Bean輸出列印就可以了
	static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable> {
		@Override
		protected void reduce(Text pid, Iterable<InfoBean> beans,Context context) throws IOException, InterruptedException {
			// 產品的bean
			InfoBean pdBean = new InfoBean();
			// 每個產品可能有多個訂單  所以使用ArrayList
			ArrayList<InfoBean> orderBeans = new ArrayList<InfoBean>();
			for(InfoBean bean : beans) {
				if("1".equals(bean.getFlag())) {  // 產品檔案中的資料
					// 產品檔案中每個產品只有一個
					// 注意不能直接使用  pdBean = bean ; 因為都是引用型別
					try {
						BeanUtils.copyProperties(pdBean, bean);
					} catch (IllegalAccessException | InvocationTargetException e) {
						e.printStackTrace();
					}
				} else {  // 訂單檔案中的資料
					InfoBean odBean = new InfoBean();  // 訂單Bean
					try {
						BeanUtils.copyProperties(odBean, bean);
						orderBeans.add(odBean);
					} catch (IllegalAccessException | InvocationTargetException e) {
						e.printStackTrace();
					}
				}
			}
			
			
			// 將兩張表中的資料進行拼接
			for(InfoBean bean: orderBeans) {  // 需要將每一個訂單中的資料全部都寫出去
				bean.setPname(bean.getPname());
				bean.setCategory_id(bean.getCategory_id());
				bean.setPrice(bean.getPrice());
				context.write(bean, NullWritable.get());
			}
			
		}
	}

執行緒啟動:

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		
		System.err.println(args[0]);
		System.err.println(args[1]);
		
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if(otherArgs.length != 2) {
			System.err.println("Usage : wordcount <in> <out>");
			System.exit(2);
		}
		
		Job job = Job.getInstance(conf);
		
		
		job.setJar("f:/rjoin.jar");
		
		// 指定本業務job要使用的mapper/Reduce業務類
		job.setMapperClass(RJoinMapper.class);
		job.setReducerClass(RJoinReducer.class);
		
		// 指定mapper輸出資料的kv型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(InfoBean.class);
		
		// 指定最終輸出的資料的kv型別
		job.setOutputKeyClass(InfoBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		
		
		Path path = new Path(otherArgs[1]);
		FileSystem fileSystem = path.getFileSystem(conf);   // 根據path找到這個檔案
		if(fileSystem.exists(path)) {
			fileSystem.delete(path, true);  // true的意思是,就算output有東西,也一帶刪除
		}
		
		
		// 指定job的輸入原始檔案所在的目錄
		// 待處理檔案可以在多個目錄裡面
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		// 指定job的輸出結果
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		
		// 將job中配置的相關引數,以及job所用的的java類所在的jar包,提交給yarn去執行
		/*job.submit();*/
		boolean res = job.waitForCompletion(true); // 會等待程式處理完成之後,程式才退出
		System.exit(res ? 0 : 1);
	}

這個是可以直接執行在hadoop叢集上的。因為我之前的專案複製了hadoop的一些配置檔案。