1. 程式人生 > >MapReduce實現矩陣轉置和矩陣相乘(Hadoop2.7)

MapReduce實現矩陣轉置和矩陣相乘(Hadoop2.7)

輸入輸出格式待補充,註釋待補充。。。

矩陣轉置

package com.cy.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class MatrixTrans {

    public static class MTMapper extends Mapper<Object, Text, Text, Text> {

        private Text outKey = new Text();
        private Text outVal = new Text();
        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            String row = value.toString().split("\t")[0];
            String[] col_val_array = value.toString().split("\t")[1].split(",");

            for (String col_val : col_val_array) {

                String col = col_val.split("_")[0];
                String val = col_val.split("_")[1];

                outKey.set(col);
                outVal.set(row + "_" + val);

                context.write(outKey, outVal);
            }
        }
    }

    public static class MTReducer extends Reducer<Text, Text, Text, Text> {

        private Text outKey = new Text();
        private Text outVal = new Text();
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            StringBuilder res = new StringBuilder();
            for (Text value : values) {

                res.append(value + ",");
            }
            String result = null;
            if (res.toString().endsWith(",")) {
                result = res.toString().substring(0, res.length() - 1);
            }

            outKey.set(key);
            outVal.set(result);

            context.write(outKey, outVal);
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: matrix transport <in> <out> ");
            System.exit(2);
        }

        Path outPath = new Path(otherArgs[1]);
        FileSystem fs = outPath.getFileSystem(conf); // 偽分散式需使用此句
        //FileSystem fs = FileSystem.get(conf);     // 提交叢集可使用此句
        fs.delete(outPath,true);

        Job job = new Job(conf, "matrix transport");
        job.setJarByClass(MatrixTrans.class);
        job.setMapperClass(MTMapper.class);
        job.setReducerClass(MTReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

矩陣相乘