MapReduce實現矩陣轉置和矩陣相乘(Hadoop2.7)
阿新 • • 發佈:2018-12-10
輸入輸出格式待補充,註釋待補充。。。
矩陣轉置
package com.cy.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; public class MatrixTrans { public static class MTMapper extends Mapper<Object, Text, Text, Text> { private Text outKey = new Text(); private Text outVal = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String row = value.toString().split("\t")[0]; String[] col_val_array = value.toString().split("\t")[1].split(","); for (String col_val : col_val_array) { String col = col_val.split("_")[0]; String val = col_val.split("_")[1]; outKey.set(col); outVal.set(row + "_" + val); context.write(outKey, outVal); } } } public static class MTReducer extends Reducer<Text, Text, Text, Text> { private Text outKey = new Text(); private Text outVal = new Text(); @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder res = new StringBuilder(); for (Text value : values) { res.append(value + ","); } String result = null; if (res.toString().endsWith(",")) { result = res.toString().substring(0, res.length() - 1); } outKey.set(key); outVal.set(result); context.write(outKey, outVal); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: matrix transport <in> <out> "); System.exit(2); } Path outPath = new Path(otherArgs[1]); FileSystem fs = outPath.getFileSystem(conf); // 偽分散式需使用此句 //FileSystem fs = FileSystem.get(conf); // 提交叢集可使用此句 fs.delete(outPath,true); Job job = new Job(conf, "matrix transport"); job.setJarByClass(MatrixTrans.class); job.setMapperClass(MTMapper.class); job.setReducerClass(MTReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
矩陣相乘