1. 程式人生 > >mapreduce——join演算法的程式碼實現

mapreduce——join演算法的程式碼實現

需求:有user資料檔案:user.csv

u001,senge,18,angelababy
u002,laozhao,48,ruhua
u003,xiaoxu,16,chunge
u004,laoyang,28,zengge
u005,nana,14,huangbo

有訂單資料檔案:order.dat.1  order.dat.2   order.dat.3

order001,u001
order002,u001
order003,u005
order004,u002
order005,u003
order006,u004

需要對上述兩類資料進行連線:

order001,u001,senge,18,angelababy
order002,u001,senge,18,angelababy
order003,u005,nana,14,huangbo
order004,u002,laozhao,48,ruhua
order005,u003,xiaoxu,16,chunge
order006,u004,laoyang,28,zengge

思路:

map端:

不管worker讀到的是什麼檔案,我們的map方法中是可以通過context來區分的

對於order資料,map中切欄位,封裝為一個joinbean,打標記:t_order

對於user資料,map中切欄位,封裝為一個joinbean,打標記:t_user

然後,以uid作為key,以joinbean作為value返回

reduce端:

用迭代器迭代出一組相同uid的所有資料joinbean,然後判斷

如果是標記欄位為t_order的,則加入一個arraylist<JoinBean>中

如果標記欄位為t_user的,則放入一個Joinbean物件中

然後,遍歷arraylist,對裡面的每一個JoinBean填充userBean中的user資料,然後輸出這個joinBean即可

程式碼實現

JoinBean 

public class JoinBean implements Writable {
    private String orderId;
    private String userId;
    private String userName;
    private int userAge;
    private String userFriend;
    private String tableName;

    public void set(String orderId, String userId, String userName, int userAge, String userFriend,String tableName) {
        this.orderId = orderId;
        this.userId = userId;
        this.userName = userName;
        this.userAge = userAge;
        this.userFriend = userFriend;
        this.tableName=tableName;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public int getUserAge() {
        return userAge;
    }

    public void setUserAge(int userAge) {
        this.userAge = userAge;
    }

    public String getUserFriend() {
        return userFriend;
    }

    public void setUserFriend(String userFriend) {
        this.userFriend = userFriend;
    }

    public String getTableName() {
        return tableName;
    }

    public void setTableName(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public String toString() {
        return this.orderId+","+this.userId+","+this.userAge
                +","+this.userName+","+this.userFriend;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.orderId);
        dataOutput.writeUTF(this.userId);
        dataOutput.writeUTF(this.userName);
        dataOutput.writeInt(this.userAge);
        dataOutput.writeUTF(this.userFriend);
        dataOutput.writeUTF(this.tableName);
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.orderId=dataInput.readUTF();
        this.userId=dataInput.readUTF();
        this.userName=dataInput.readUTF();
        this.userAge=dataInput.readInt();
        this.userFriend=dataInput.readUTF();
        this.tableName=dataInput.readUTF();
    }
}

ReduceSideJoin 

public class ReduceSideJoin {

    public static class ReduceSideJoinMapper extends Mapper<LongWritable,Text,Text,JoinBean>{
        String fileName =null;
        JoinBean bean = new JoinBean();
        Text k=new Text();
        /**
         * maptask在做資料處理的時候,會先呼叫一次setup(只會呼叫一次)
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            fileName=inputSplit.getPath().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split(",");
            if (fileName.startsWith("order")){
                bean.set(fields[0],fields[1],"NULL",-1,"NULL","order");
            }else{
                bean.set("NULL",fields[0],fields[1],Integer.parseInt(fields[2]),fields[3],"user");
            }
            k.set(bean.getUserId());
            context.write(k,bean);

        }
    }
    public static class ReducerSideJoinReduce extends Reducer<Text,JoinBean,JoinBean,NullWritable>{

        @Override
        protected void reduce(Text key, Iterable<JoinBean> beans, Context context) throws IOException, InterruptedException {
            ArrayList<JoinBean> orderList = new ArrayList<JoinBean>();
            JoinBean userBean=null;
            try {
            for (JoinBean bean:beans){
                //區分兩類資料
                if("order".equals(bean.getTableName())){
                        JoinBean newBean = new JoinBean();
                        BeanUtils.copyProperties(newBean,bean);
                        orderList.add(newBean);
                    }else{
                        userBean=new JoinBean();
                        BeanUtils.copyProperties(userBean,bean);
                    }
                }
                //拼接資料userBean資料加入到orderBean
                for(JoinBean bean:orderList){
                    bean.setUserAge(userBean.getUserAge());
                    bean.setUserFriend(userBean.getUserFriend());
                    bean.setUserName(userBean.getUserName());
                    context.write(bean,NullWritable.get());
                    System.out.println(userBean.getUserFriend());

                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
    public static void main(String[] args)throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //動態獲取jar包在哪裡
        job.setJarByClass(ReduceSideJoin.class);
        //2.封裝引數:本次job所要呼叫的mapper實現類
        job.setMapperClass(ReduceSideJoinMapper.class);
        job.setReducerClass(ReducerSideJoinReduce.class);
        //3.封裝引數:本次job的Mapper實現類產生的資料key,value的型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(JoinBean.class);
        //4.封裝引數:本次Reduce返回的key,value資料型別
        job.setOutputKeyClass(JoinBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\join\\input"));
        FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\join\\out"));
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0:-1);
    }
}

測試資料

order.txt.1

order001,u001
order002,u001
order003,u005
order004,u002
order005,u003
order006,u004

order.txt.2

order001,u001
order002,u001
order003,u005
order004,u002
order005,u003
order006,u004

user.txt

u001,senge,18,angelababy
u002,laozhao,48,ruhua
u003,xiaoxu,16,chunge
u004,laoyang,28,zengge
u005,nana,14,huangbo

結果輸出

order002,u001,18,senge,angelababy
order001,u001,18,senge,angelababy
order002,u001,18,senge,angelababy
order001,u001,18,senge,angelababy
order004,u002,48,laozhao,ruhua
order004,u002,48,laozhao,ruhua
order005,u003,16,xiaoxu,chunge
order005,u003,16,xiaoxu,chunge
order006,u004,28,laoyang,zengge
order006,u004,28,laoyang,zengge
order003,u005,14,nana,huangbo
order003,u005,14,nana,huangbo