1. 程式人生 > >使用MapReduce實現兩個文件的Join操作

使用MapReduce實現兩個文件的Join操作

ash 鍵值 turn @param nts n) extend cache inter

數據結構

customer表

1
hanmeimei  
ShangHai   
110
2
leilei 
BeiJing    
112
3
lucy   
GuangZhou  
119


oder表

1 1 50
2 1 200
3 3 15
4 3 350
5 3 58
6 1 42
7 1 352
8 2 1135
9 2 400
10 2 2000
11 2 300


MAPJOIN

場景:我們模擬一個有一份小表一個大表的場景,customer是那份小表,order是那份大表
做法:直接將較小的數據加載到內存中,按照連接的關鍵字建立索引,

大份數據作為MapTask的輸入鍵值對 map()方法的每次輸入都去內存當中直接去匹配連接。

然後把連接結果按 key 輸出,這種方法要使用 hadoop中的 DistributedCache 把小份數據分布到各個計算節點,

每個 maptask 執行任務的節點都需要加載該數據到內存,並且按連接關鍵字建立索引。


環境配置:因為我們是在本地操作的,所以需要配置本地的hadoop

1。下載hadoop

技術分享圖片

2.解壓到一個目錄,記住,一會要用

技術分享圖片

配置電腦環境變量

技術分享圖片

如果你是一個初學者那麽你就創建一個Java工程,步驟自己搜吧,網上一大堆然後創建一個mapjoin的包,在包裏創建一個

JoinDemo的類然後如下第24行代碼,在JoinDemo後加extened往後就根據我的代碼敲就好了。

 
package mapjoin;
##以上內容標誌了段代碼是在mapjoin這個包裏 import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.
Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.*; import java.net.URI; import java.util.HashMap; import java.util.Map;

##以上內容是需要導入的jar包,否則一些代碼會報紅。

  1 public class JoinDemo extends Configured implements Tool {
  2     // customer文件在本地上的位置。
  3     // TODO: 改用參數傳入
  4     private static final String CUSTOMER_CACHE_URL = "input/customer.txt"; --本地輸入文件路徑
  5 //            "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
  6     private static class CustomerBean {                 --
  7         private int custId;
  8         private String name;
  9         private String address;
 10         private String phone;
 11 
 12         public CustomerBean() {}
 13 
 14         public CustomerBean(int custId, String name, String address,String phone)
{
16 super(); 17 this.custId = custId; 18 this.name = name; 19 this.address = address; 20 this.phone = phone; 21 } 22 23 24 25 public int getCustId() { 26 return custId; 27 } 28 29 public String getName() { 30 return name; 31 } 32 33 public String getAddress() { 34 return address; 35 } 36 37 public String getPhone() { 38 return phone; 39 } 40 } 41 42 private static class CustOrderMapOutKey implements 43 WritableComparable<CustOrderMapOutKey> { 44 private int custId; 45 private int orderId; 46 47 public void set(int custId, int orderId) { 48 this.custId = custId; 49 this.orderId = orderId; 50 } 51 52 public int getCustId() { 53 return custId; 54 } 55 56 public int getOrderId() { 57 return orderId; 58 } 59 60 61 public void write( DataOutput out) throws IOException { 62 out.writeInt(custId); 63 out.writeInt(orderId); 64 } 65 66 67 public void readFields( DataInput in) throws IOException { 68 custId = in.readInt(); 69 orderId = in.readInt(); 70 } 71 72 public int compareTo(CustOrderMapOutKey o) { 73 int res = custId -o.custId; 74 return res == 0 ? orderId-o.orderId : res; 75 } 76 77 public boolean equals(Object obj) { 78 if (obj instanceof CustOrderMapOutKey) { 79 CustOrderMapOutKey o = (CustOrderMapOutKey)obj; 80 return custId == o.custId && orderId == o.orderId; 81 } else { 82 return false; 83 } 84 } 85 86 @Override 87 public String toString() { 88 return custId + "\t" + orderId; 89 } 90 } 91 92 private static class JoinMapper extends 93 Mapper<LongWritable, Text, CustOrderMapOutKey, Text> { 94 private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey(); 95 private final Text outputValue = new Text(); 96 97 98 /** 99 * 在內存中customer數據 100 */ 101 private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, CustomerBean>(); 102 103 @Override 104 protected void setup(Context context) 105 throws IOException, InterruptedException { 106 //讀取緩存的文件 107 FileSystem fs = FileSystem 108 .get( URI.create(CUSTOMER_CACHE_URL), context.getConfiguration()); 109 FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL)); 110 111 BufferedReader reader = new BufferedReader(new InputStreamReader(fdis)); 112 String line = null; 113 String[] cols = null; 114 115 // 格式:客戶編號 姓名 地址 電話 116 while ((line = reader.readLine()) != null) { 117 cols = line.split("\t"); 118 if (cols.length < 4) { // 數據格式不匹配,忽略 119 continue; 120 } 121 122 CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]); 123 //緩存客戶信息 124 CUSTOMER_MAP.put(bean.getCustId(), bean); 125 } 126 } 127 128 @Override 129 protected void map(LongWritable key, Text value, Mapper.Context context) 130 throws IOException, InterruptedException { 131 132 // 格式: 訂單編號 客戶編號 訂單金額 133 String[] cols = value.toString().split("\t"); 134 if (cols.length < 3) { 135 return; 136 } 137 138 int custId = Integer.parseInt(cols[1]); // 取出客戶編號 139 CustomerBean customerBean = CUSTOMER_MAP.get(custId); 140 141 if (customerBean == null) { // 沒有對應的customer信息可以連接 142 return; 143 } 144 145 StringBuffer sb = new StringBuffer(); 146 sb.append(cols[2]) 147 .append("\t") 148 .append(customerBean.getName()) 149 .append("\t") 150 .append(customerBean.getAddress()) 151 .append("\t") 152 .append(customerBean.getPhone()); 153 154 outputValue.set(sb.toString()); 155 outputKey.set(custId, Integer.parseInt(cols[0])); 156 157 //WC 操作 158 // context.write(value, 1); 159 160 context.write(outputKey, outputValue); 161 } 162 163 } 164 165 /** 166 * reduce 167 * @author Ivan 168 * 169 */ 170 private static class JoinReducer extends 171 Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> { 172 @Override 173 protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context) 174 throws IOException, InterruptedException { 175 // 什麽事都不用做,直接輸出 176 for (Text value : values) { 177 context.write(key, value); 178 } 179 } 180 } 181 /** 182 * @param args 183 * @throws Exception 184 */ 185 public static void main(String[] args) throws Exception { 186 // if (args.length < 2) { 187 // new IllegalArgumentException("Usage: <inpath> <outpath>"); 188 // return; 189 // } 190 //調用 JoinDemo 對象的run方法 191 ToolRunner.run(new Configuration(), new JoinDemo(), args); 192 } 193 194 195 public int run(String[] args) throws Exception { 196 System.setProperty("hadoop.home.dir", "D:\\software\\hadoop-2.6.0\\hadoop-2.6.0"); 197 //默認配置 198 Configuration conf = getConf(); 199 FileSystem fileSystem=FileSystem.get(conf); 200 Path outputPath=new Path("output/"); 201 if(fileSystem.exists(outputPath)){ 202 fileSystem.delete(outputPath,true); 203 } 204 205 Job job = Job.getInstance(conf); 206 job.setJarByClass(JoinDemo.class); 207 208 // 添加customer cache文件 209 job.addCacheFile(URI.create(CUSTOMER_CACHE_URL)); 210 Path inputPath=new Path("input/order.txt"); 211 // Path inputPath= new Path(args[0]) 212 FileInputFormat.addInputPath(job, inputPath); 213 214 // Path outputPath=new Path(args[1]); 215 FileOutputFormat.setOutputPath(job, outputPath); 216 217 // map settings 218 job.setMapperClass(JoinMapper.class); 219 job.setMapOutputKeyClass(CustOrderMapOutKey.class); 220 job.setMapOutputValueClass(Text.class); 221 222 // reduce settings 223 // job.setReducerClass(JoinReducer.class); 224 // job.setOutputKeyClass(CustOrderMapOutKey.class); 225 // job.setOutputKeyClass(Text.class); 226 boolean res = job.waitForCompletion(true); 227 return res ? 0 : 1; 228 } 229 }

本地的文件位置

技術分享圖片

最後執行結果:

 

使用MapReduce實現兩個文件的Join操作