1. 程式人生 > >MapReduce實現資料清洗(去重)

MapReduce實現資料清洗(去重)

上面是資料的格式

特徵碼#路徑(路徑裡面包括身份證號和姓名),我通過身份證號進行去重。

Map程式

public static class DistinctMaper extends Mapper<LongWritable, Text, Text, PeopleBean>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, PeopleBean>.Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split("#");
//特徵碼
String tezengma = strs[0];
//路徑
String lujin = strs[1];
if(lujin.indexOf("_")!=-1){
//身份證號
String sfz =lujin.substring(lujin.lastIndexOf("_")+1, lujin.indexOf("."));
//姓名
String xm = lujin.substring(lujin.lastIndexOf("/")+1,lujin.lastIndexOf("_"));
PeopleBean peopleBean = new PeopleBean(tezengma, lujin, sfz, xm);
context.write(new Text(sfz), peopleBean);
}

}

}

reduce程式

public static class DistinctReducer extends Reducer<Text, PeopleBean, Text, Text>{


@Override
protected void reduce(Text key, Iterable<PeopleBean> value, Reducer<Text, PeopleBean, Text, Text>.Context context)
throws IOException, InterruptedException {
String peopleinfo =null;
for(PeopleBean people:value){
peopleinfo = people.toString();
break;
}
context.write(key, new Text("\t"+peopleinfo.toString()));
}

}

PeopleBean是自己寫的類,用於儲存相關資訊。

package cn.hadoop.mr;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.Writable;

//Writable是hadoop的序列化介面
public class PeopleBean implements Writable {

private String tzm;//特徵碼

private String lujing;//路徑

private String idCard;//身份證號

private String name;//姓名


@Override
//serialize
public void readFields(DataInput in) throws IOException {
this.tzm = in.readUTF();
this.lujing = in.readUTF();
this.idCard = in.readUTF();
this.name = in.readUTF();
}


@Override
//deserialize
public void write(DataOutput out) throws IOException {
out.writeUTF(tzm);
out.writeUTF(lujing);
out.writeUTF(idCard);
out.writeUTF(name);

}

public String toString(){
return this.tzm + "\t" +this.lujing + "\t" + this.idCard + "\t" + this.name;
}

public PeopleBean(){}


public PeopleBean(String tzm, String lujing, String idCard, String name) {
this.tzm = tzm;
this.lujing = lujing;
this.idCard = idCard;
this.name = name;
}


public String getTzm() {
return tzm;
}


public void setTzm(String tzm) {
this.tzm = tzm;
}


public String getLujing() {
return lujing;
}


public void setLujing(String lujing) {
this.lujing = lujing;
}


public String getIdCard() {
return idCard;
}


public void setIdCard(String idCard) {
this.idCard = idCard;
}


public String getName() {
return name;
}


public void setName(String name) {
this.name = name;
}

}

//main方法裡建立job任務

private static final String INPUT_PATH = "/file/*";  //hdfs中要清洗資料的路徑
private static final String OUT_PATH = "/outPeopleDistinct";  //處理完成後的資料存放路徑
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("dfs.nameservices", "ns1");
conf.set("dfs.ha.namenodes.ns1", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.ns1.nn1", "master:9000");//namenode1
conf.set("dfs.namenode.rpc-address.ns1.nn2", "node1:9000");//namenode2
conf.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

//建立檔案系統
try {
//FileSystem fileSystem = FileSystem.get(new URI("hdfs://ns1"), conf, "bl");

//建立任務
Job job = Job.getInstance(conf);
//注意:main方法所在類
job.setJarByClass(DistinctPeople.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
//job.setInputFormatClass(TextInputFormat.class);

//1.2   設定自定義Mapper類和設定map函式輸出資料的key和value的型別  
            job.setMapperClass(DistinctMaper.class);  
            job.setMapOutputKeyClass(Text.class);  
            job.setMapOutputValueClass(PeopleBean.class); 
            
            //1.3   設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個)  
            job.setPartitionerClass(HashPartitioner.class);  
            job.setNumReduceTasks(1);  
            
            //1.4   排序  
            //1.5   歸約  
//            job.setCombinerClass(DistinctReducer.class);  
            //2.1   Shuffle把資料從Map端拷貝到Reduce端。  
            //2.2   指定Reducer類和輸出key和value的型別  
            job.setReducerClass(DistinctReducer.class);  
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(Text.class);  
            
            //2.3   指定輸出的路徑和設定輸出的格式化類  
            FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
//            job.setOutputFormatClass(TextOutputFormat.class);  
            
            // 提交作業 退出  
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}