MapReduce實現資料清洗(去重)
上面是資料的格式
特徵碼#路徑(路徑裡面包括身份證號和姓名),我通過身份證號進行去重。
Map程式
public static class DistinctMaper extends Mapper<LongWritable, Text, Text, PeopleBean>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, PeopleBean>.Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split("#");
//特徵碼
String tezengma = strs[0];
//路徑
String lujin = strs[1];
if(lujin.indexOf("_")!=-1){
//身份證號
String sfz =lujin.substring(lujin.lastIndexOf("_")+1, lujin.indexOf("."));
//姓名
String xm = lujin.substring(lujin.lastIndexOf("/")+1,lujin.lastIndexOf("_"));
PeopleBean peopleBean = new PeopleBean(tezengma, lujin, sfz, xm);
context.write(new Text(sfz), peopleBean);
}
}
}
reduce程式
public static class DistinctReducer extends Reducer<Text, PeopleBean, Text, Text>{
@Override
protected void reduce(Text key, Iterable<PeopleBean> value, Reducer<Text, PeopleBean, Text, Text>.Context context)
throws IOException, InterruptedException {
String peopleinfo =null;
for(PeopleBean people:value){
peopleinfo = people.toString();
break;
}
context.write(key, new Text("\t"+peopleinfo.toString()));
}
}
PeopleBean是自己寫的類,用於儲存相關資訊。
package cn.hadoop.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//Writable是hadoop的序列化介面
public class PeopleBean implements Writable {
private String tzm;//特徵碼
private String lujing;//路徑
private String idCard;//身份證號
private String name;//姓名
@Override
//serialize
public void readFields(DataInput in) throws IOException {
this.tzm = in.readUTF();
this.lujing = in.readUTF();
this.idCard = in.readUTF();
this.name = in.readUTF();
}
@Override
//deserialize
public void write(DataOutput out) throws IOException {
out.writeUTF(tzm);
out.writeUTF(lujing);
out.writeUTF(idCard);
out.writeUTF(name);
}
public String toString(){
return this.tzm + "\t" +this.lujing + "\t" + this.idCard + "\t" + this.name;
}
public PeopleBean(){}
public PeopleBean(String tzm, String lujing, String idCard, String name) {
this.tzm = tzm;
this.lujing = lujing;
this.idCard = idCard;
this.name = name;
}
public String getTzm() {
return tzm;
}
public void setTzm(String tzm) {
this.tzm = tzm;
}
public String getLujing() {
return lujing;
}
public void setLujing(String lujing) {
this.lujing = lujing;
}
public String getIdCard() {
return idCard;
}
public void setIdCard(String idCard) {
this.idCard = idCard;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
//main方法裡建立job任務
private static final String INPUT_PATH = "/file/*"; //hdfs中要清洗資料的路徑
private static final String OUT_PATH = "/outPeopleDistinct"; //處理完成後的資料存放路徑
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("dfs.nameservices", "ns1");
conf.set("dfs.ha.namenodes.ns1", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.ns1.nn1", "master:9000");//namenode1
conf.set("dfs.namenode.rpc-address.ns1.nn2", "node1:9000");//namenode2
conf.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
//建立檔案系統
try {
//FileSystem fileSystem = FileSystem.get(new URI("hdfs://ns1"), conf, "bl");
//建立任務
Job job = Job.getInstance(conf);
//注意:main方法所在類
job.setJarByClass(DistinctPeople.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
//job.setInputFormatClass(TextInputFormat.class);
//1.2 設定自定義Mapper類和設定map函式輸出資料的key和value的型別
job.setMapperClass(DistinctMaper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PeopleBean.class);
//1.3 設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個)
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 排序
//1.5 歸約
// job.setCombinerClass(DistinctReducer.class);
//2.1 Shuffle把資料從Map端拷貝到Reduce端。
//2.2 指定Reducer類和輸出key和value的型別
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//2.3 指定輸出的路徑和設定輸出的格式化類
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
// job.setOutputFormatClass(TextOutputFormat.class);
// 提交作業 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}