1. 程式人生 > >用MapReduce實現矩陣乘法

用MapReduce實現矩陣乘法

  1. import org.apache.hadoop.mapred.JobConf;

  2. public class MainRun {

  3. public static final String HDFS = "hdfs://192.168.1.210:9000";

  4. public static final Pattern DELIMITER = Pattern.compile("[\t,]");

  5. public static void main(String[] args) {

  6. martrixMultiply();

  7. }

  8. public static void martrixMultiply() {

  9. Map<String, String> path = new HashMap<String, String>();

  10. path.put("m1", "logfile/matrix/m1.csv");// 本地的資料檔案

  11. path.put("m2", "logfile/matrix/m2.csv");

  12. path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目錄

  13. path.put("input1", HDFS + "/user/hdfs/matrix/m1");

  14. path.put("input2", HDFS + "/user/hdfs/matrix/m2");

  15. path.put("output", HDFS + "/user/hdfs/matrix/output");

  16. try {

  17. MartrixMultiply.run(path);// 啟動程式

  18. } catch (Exception e) {

  19. e.printStackTrace();

  20. }

  21. System.exit(0);

  22. }

  23. public static JobConf config() {// Hadoop叢集的遠端配置資訊

  24. JobConf conf = new JobConf(MainRun.class);

  25. conf.setJobName("MartrixMultiply");

  26. conf.addResource("classpath:/hadoop/core-site.xml");

  27. conf.addResource("classpath:/hadoop/hdfs-site.xml");

  28. conf.addResource("classpath:/hadoop/mapred-site.xml");

  29. return conf;

  30. }

  31. }

3).新建MR程式:MartrixMultiply.java

MapReduce程式

  1. package org.conan.myhadoop.matrix;

  2. import java.io.IOException;

  3. import java.util.HashMap;

  4. import java.util.Iterator;

  5. import java.util.Map;

  6. import org.apache.hadoop.fs.Path;

  7. import org.apache.hadoop.io.IntWritable;

  8. import org.apache.hadoop.io.LongWritable;

  9. import org.apache.hadoop.io.Text;

  10. import org.apache.hadoop.mapred.JobConf;

  11. import org.apache.hadoop.mapreduce.Job;

  12. import org.apache.hadoop.mapreduce.Mapper;

  13. import org.apache.hadoop.mapreduce.Reducer;

  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  15. import org.apache.hadoop.mapreduce.lib.input.FileSplit;

  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

  19. import org.conan.myhadoop.hdfs.HdfsDAO;

  20. public class MartrixMultiply {

  21. public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {

  22. private String flag;// m1 or m2

  23. private int rowNum = 2;// 矩陣A的行數

  24. private int colNum = 2;// 矩陣B的列數

  25. private int rowIndexA = 1; // 矩陣A,當前在第幾行

  26. private int rowIndexB = 1; // 矩陣B,當前在第幾行

  27. @Override

  28. protected void setup(Context context) throws IOException, InterruptedException {

  29. FileSplit split = (FileSplit) context.getInputSplit();

  30. flag = split.getPath().getName();// 判斷讀的資料集

  31. }

  32. @Override

  33. public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {

  34. String[] tokens = MainRun.DELIMITER.split(values.toString());

  35. if (flag.equals("m1")) {

  36. for (int i = 1; i <= rowNum; i++) {

  37. Text k = new Text(rowIndexA + "," + i);

  38. for (int j = 1; j <= tokens.length; j++) {

  39. Text v = new Text("A:" + j + "," + tokens[j - 1]);

  40. context.write(k, v);

  41. System.out.println(k.toString() + " " + v.toString());

  42. }

  43. }

  44. rowIndexA++;

  45. } else if (flag.equals("m2")) {

  46. for (int i = 1; i <= tokens.length; i++) {

  47. for (int j = 1; j <= colNum; j++) {

  48. Text k = new Text(i + "," + j);

  49. Text v = new Text("B:" + rowIndexB + "," + tokens[j - 1]);

  50. context.write(k, v);

  51. System.out.println(k.toString() + " " + v.toString());

  52. }

  53. }

  54. rowIndexB++;

  55. }

  56. }

  57. }

  58. public static class MatrixReducer extends Reducer<Text, Text, Text, IntWritable> {

  59. @Override

  60. public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

  61. Map<String, String> mapA = new HashMap<String, String>();

  62. Map<String, String> mapB = new HashMap<String, String>();

  63. System.out.print(key.toString() + ":");

  64. for (Text line : values) {

  65. String val = line.toString();

  66. System.out.print("("+val+")");

  67. if (val.startsWith("A:")) {

  68. String[] kv = MainRun.DELIMITER.split(val.substring(2));

  69. mapA.put(kv[0], kv[1]);

  70. // System.out.println("A:" + kv[0] + "," + kv[1]);

  71. } else if (val.startsWith("B:")) {

  72. String[] kv = MainRun.DELIMITER.split(val.substring(2));

  73. mapB.put(kv[0], kv[1]);

  74. // System.out.println("B:" + kv[0] + "," + kv[1]);

  75. }

  76. }

  77. int result = 0;

  78. Iterator<String> iter = mapA.keySet().iterator();

  79. while (iter.hasNext()) {

  80. String mapk = iter.next();

  81. result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(mapB.get(mapk));

  82. }

  83. context.write(key, new IntWritable(result));

  84. System.out.println();

  85. // System.out.println("C:" + key.toString() + "," + result);

  86. }

  87. }

  88. public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {

  89. JobConf conf = MainRun.config();

  90. String input = path.get("input");

  91. String input1 = path.get("input1");

  92. String input2 = path.get("input2");

  93. String output = path.get("output");

  94. HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);

  95. hdfs.rmr(input);

  96. hdfs.mkdirs(input);

  97. hdfs.copyFile(path.get("m1"), input1);

  98. hdfs.copyFile(path.get("m2"), input2);

  99. Job job = new Job(conf);

  100. job.setJarByClass(MartrixMultiply.class);

  101. job.setOutputKeyClass(Text.class);

  102. job.setOutputValueClass(Text.class);

  103. job.setMapperClass(MatrixMapper.class);

  104. job.setReducerClass(MatrixReducer.class);

  105. job.setInputFormatClass(TextInputFormat.class);

  106. job.setOutputFormatClass(TextOutputFormat.class);

  107. FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 載入2個輸入資料集

  108. FileOutputFormat.setOutputPath(job, new Path(output));

  109. job.waitForCompletion(true);

  110. }

  111. }

執行日誌

  1. Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix

  2. Create: hdfs://192.168.1.210:9000/user/hdfs/matrix

  3. copy from: logfile/matrix/m1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1

  4. copy from: logfile/matrix/m2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2

  5. 2014-1-15 10:48:03 org.apache.hadoop.util.NativeCodeLoader <clinit>

  6. 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

  7. 2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles

  8. 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

  9. 2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles

  10. 警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

  11. 2014-1-15 10:48:03 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus

  12. 資訊: Total input paths to process : 2

  13. 2014-1-15 10:48:03 org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>

  14. 警告: Snappy native library not loaded

  15. 2014-1-15 10:48:04 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  16. 資訊: Running job: job_local_0001

  17. 2014-1-15 10:48:04 org.apache.hadoop.mapred.Task initialize

  18. 資訊: Using ResourceCalculatorPlugin : null

  19. 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>

  20. 資訊: io.sort.mb = 100

  21. 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>

  22. 資訊: data buffer = 79691776/99614720

  23. 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>

  24. 資訊: record buffer = 262144/327680

  25. 1,1 A:1,1

  26. 1,1 A:2,0

  27. 1,1 A:3,2

  28. 1,2 A:1,1

  29. 1,2 A:2,0

  30. 1,2 A:3,2

  31. 2,1 A:1,-1

  32. 2,1 A:2,3

  33. 2,1 A:3,1

  34. 2,2 A:1,-1

  35. 2,2 A:2,3

  36. 2,2 A:3,1

  37. 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush

  38. 資訊: Starting flush of map output

  39. 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill

  40. 資訊: Finished spill 0

  41. 2014-1-15 10:48:04 org.apache.hadoop.mapred.Task done

  42. 資訊: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

  43. 2014-1-15 10:48:05 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  44. 資訊: map 0% reduce 0%

  45. 2014-1-15 10:48:07 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  46. 資訊:

  47. 2014-1-15 10:48:07 org.apache.hadoop.mapred.Task sendDone

  48. 資訊: Task 'attempt_local_0001_m_000000_0' done.

  49. 2014-1-15 10:48:07 org.apache.hadoop.mapred.Task initialize

  50. 資訊: Using ResourceCalculatorPlugin : null

  51. 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>

  52. 資訊: io.sort.mb = 100

  53. 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>

  54. 資訊: data buffer = 79691776/99614720

  55. 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>

  56. 資訊: record buffer = 262144/327680

  57. 1,1 B:1,3

  58. 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush

  59. 資訊: Starting flush of map output

  60. 1,2 B:1,1

  61. 2,1 B:1,3

  62. 2,2 B:1,1

  63. 1,1 B:2,2

  64. 1,2 B:2,1

  65. 2,1 B:2,2

  66. 2,2 B:2,1

  67. 1,1 B:3,1

  68. 1,2 B:3,0

  69. 2,1 B:3,1

  70. 2,2 B:3,0

  71. 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill

  72. 資訊: Finished spill 0

  73. 2014-1-15 10:48:07 org.apache.hadoop.mapred.Task done

  74. 資訊: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting

  75. 2014-1-15 10:48:08 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  76. 資訊: map 100% reduce 0%

  77. 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  78. 資訊:

  79. 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task sendDone

  80. 資訊: Task 'attempt_local_0001_m_000001_0' done.

  81. 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task initialize

  82. 資訊: Using ResourceCalculatorPlugin : null

  83. 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  84. 資訊:

  85. 2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge

  86. 資訊: Merging 2 sorted segments

  87. 2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge

  88. 資訊: Down to the last merge-pass, with 2 segments left of total size: 294 bytes

  89. 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  90. 資訊:

  91. 1,1:(B:1,3)(B:2,2)(B:3,1)(A:1,1)(A:2,0)(A:3,2)

  92. 1,2:(A:1,1)(A:2,0)(A:3,2)(B:1,1)(B:2,1)(B:3,0)

  93. 2,1:(B:1,3)(B:2,2)(B:3,1)(A:1,-1)(A:2,3)(A:3,1)

  94. 2,2:(A:1,-1)(A:2,3)(A:3,1)(B:1,1)(B:2,1)(B:3,0)

  95. 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task done

  96. 資訊: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

  97. 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  98. 資訊:

  99. 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task commit

  100. 資訊: Task attempt_local_0001_r_000000_0 is allowed to commit now

  101. 2014-1-15 10:48:10 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask

  102. 資訊: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output

  103. 2014-1-15 10:48:13 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  104. 資訊: reduce > reduce

  105. 2014-1-15 10:48:13 org.apache.hadoop.mapred.Task sendDone

  106. 資訊: Task 'attempt_local_0001_r_000000_0' done.

  107. 2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  108. 資訊: map 100% reduce 100%

  109. 2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  110. 資訊: Job complete: job_local_0001

  111. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  112. 資訊: Counters: 19

  113. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  114. 資訊: File Output Format Counters

  115. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  116. 資訊: Bytes Written=24

  117. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  118. 資訊: FileSystemCounters

  119. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  120. 資訊: FILE_BYTES_READ=1713

  121. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  122. 資訊: HDFS_BYTES_READ=75

  123. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  124. 資訊: FILE_BYTES_WRITTEN=125314

  125. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  126. 資訊: HDFS_BYTES_WRITTEN=114

  127. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  128. 資訊: File Input Format Counters

  129. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  130. 資訊: Bytes Read=30

  131. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  132. 資訊: Map-Reduce Framework

  133. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  134. 資訊: Map output materialized bytes=302

  135. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  136. 資訊: Map input records=5

  137. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  138. 資訊: Reduce shuffle bytes=0

  139. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  140. 資訊: Spilled Records=48

  141. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  142. 資訊: Map output bytes=242

  143. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  144. 資訊: Total committed heap usage (bytes)=764215296

  145. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  146. 資訊: SPLIT_RAW_BYTES=220

  147. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  148. 資訊: Combine input records=0

  149. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  150. 資訊: Reduce input records=24

  151. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  152. 資訊: Reduce input groups=4

  153. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  154. 資訊: Combine output records=0

  155. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  156. 資訊: Reduce output records=4

  157. 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log

  158. 資訊: Map output records=24

4. 稀疏矩陣乘法的MapReduce計算

我們在用矩陣處理真實資料的時候,一般都是非常稀疏矩陣,為了節省儲存空間,通常只會儲存非0的資料。

下面我們來做一個稀疏矩陣:

spraseMatrix

  • R語言的實現矩陣乘法
  • 新建2個矩陣資料檔案sm1.csv, sm2.csv
  • 修改啟動程式:MainRun.java
  • 新建MR程式:SparseMartrixMultiply.java

1). R語言的實現矩陣乘法

R語言程式

  1. > m1<-matrix(c(1,0,0,3,2,5,0,4,0,0,0,1,4,7,1,2),nrow=4,byrow=TRUE);m1

  2. [,1] [,2] [,3] [,4]

  3. [1,] 1 0 0 3

  4. [2,] 2 5 0 4

  5. [3,] 0 0 0 1

  6. [4,] 4 7 1 2

  7. > m2<-matrix(c(5,0,0,2,0,0,3,1),nrow=4,byrow=TRUE);m2

  8. [,1] [,2]

  9. [1,] 5 0

  10. [2,] 0 2

  11. [3,] 0 0

  12. [4,] 3 1

  13. > m3<-m1 %*% m2;m3

  14. [,1] [,2]

  15. [1,] 14 3

  16. [2,] 22 14

  17. [3,] 3 1

  18. [4,] 26 16

2).新建2個稀疏矩陣資料檔案sm1.csv, sm2.csv

只儲存非0的資料,3列儲存,第一列“原矩陣行”,第二列“原矩陣列”,第三列“原矩陣值”。

sm1.csv

  1. 1,1,1

  2. 1,4,3

  3. 2,1,2

  4. 2,2,5

  5. 2,4,4

  6. 3,4,1

  7. 4,1,4

  8. 4,2,7

  9. 4,3,1

  10. 4,4,2

sm2.csv

  1. 1,1,5

  2. 2,2,2

  3. 4,1,3

  4. 4,2,1

3).修改啟動程式:MainRun.java

增加SparseMartrixMultiply的啟動配置

  1. public static void main(String[] args) {

  2. sparseMartrixMultiply();

  3. }

  4. public static void sparseMartrixMultiply() {

  5. Map<String, String> path = new HashMap<String, String>();

  6. path.put("m1", "logfile/matrix/sm1.csv");// 本地的資料檔案

  7. path.put("m2", "logfile/matrix/sm2.csv");

  8. path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目錄

  9. path.put("input1", HDFS + "/user/hdfs/matrix/m1");

  10. path.put("input2", HDFS + "/user/hdfs/matrix/m2");

  11. path.put("output", HDFS + "/user/hdfs/matrix/output");

  12. try {

  13. SparseMartrixMultiply.run(path);// 啟動程式

  14. } catch (Exception e) {

  15. e.printStackTrace();

  16. }

  17. System.exit(0);

  18. }

4). 新建MR程式:SparseMartrixMultiply.java

spareseMatrix2

  • map函式有修改,reduce函式沒有變化
  • 去掉判斷所在行和列的變數
  1. package org.conan.myhadoop.matrix;

  2. import java.io.IOException;

  3. import java.util.HashMap;

  4. import java.util.Iterator;

  5. import java.util.Map;

  6. import org.apache.hadoop.fs.Path;

  7. import org.apache.hadoop.io.IntWritable;

  8. import org.apache.hadoop.io.LongWritable;

  9. import org.apache.hadoop.io.Text;

  10. import org.apache.hadoop.mapred.JobConf;

  11. import org.apache.hadoop.mapreduce.Job;

  12. import org.apache.hadoop.mapreduce.Mapper;

  13. import org.apache.hadoop.mapreduce.Reducer;

  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  15. import org.apache.hadoop.mapreduce.lib.input.FileSplit;

  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

  19. import org.conan.myhadoop.hdfs.HdfsDAO;

  20. public class SparseMartrixMultiply {

  21. public static class SparseMatrixMapper extends Mapper>LongWritable, Text, Text, Text< {

  22. private String flag;// m1 or m2

  23. private int rowNum = 4;// 矩陣A的行數

  24. private int colNum = 2;// 矩陣B的列數

  25. @Override

  26. protected void setup(Context context) throws IOException, InterruptedException {

  27. FileSplit split = (FileSplit) context.getInputSplit();

  28. flag = split.getPath().getName();// 判斷讀的資料集

  29. }

  30. @Override

  31. public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {

  32. String[] tokens = MainRun.DELIMITER.split(values.toString());

  33. if (flag.equals("m1")) {

  34. String row = tokens[0];

  35. String col = tokens[1];

  36. String val = tokens[2];

  37. for (int i = 1; i >= colNum; i++) {

  38. Text k = new Text(row + "," + i);

  39. Text v = new Text("A:" + col + "," + val);

  40. context.write(k, v);

  41. System.out.println(k.toString() + " " + v.toString());

  42. }

  43. } else if (flag.equals("m2")) {

  44. String row = tokens[0];

  45. String col = tokens[1];

  46. String val = tokens[2];

  47. for (int i = 1; i >= rowNum; i++) {

  48. Text k = new Text(i + "," + col);

  49. Text v = new Text("B:" + row + "," + val);

  50. context.write(k, v);

  51. System.out.println(k.toString() + " " + v.toString());

  52. }

  53. }

  54. }

  55. }

  56. public static class SparseMatrixReducer extends Reducer>Text, Text, Text, IntWritable< {

  57. @Override

  58. public void reduce(Text key, Iterable>Text< values, Context context) throws IOException, InterruptedException {

  59. Map>String, String< mapA = new HashMap>String, String<();

  60. Map>String, String< mapB = new HashMap>String, String<();

  61. System.out.print(key.toString() + ":");

  62. for (Text line : values) {

  63. String val = line.toString();

  64. System.out.print("(" + val + ")");

  65. if (val.startsWith("A:")) {

  66. String[] kv = MainRun.DELIMITER.split(val.substring(2));

  67. mapA.put(kv[0], kv[1]);

  68. // System.out.println("A:" + kv[0] + "," + kv[1]);

  69. } else if (val.startsWith("B:")) {

  70. String[] kv = MainRun.DELIMITER.split(val.substring(2));

  71. mapB.put(kv[0], kv[1]);

  72. // System.out.println("B:" + kv[0] + "," + kv[1]);

  73. }

  74. }

  75. int result = 0;

  76. Iterator>String< iter = mapA.keySet().iterator();

  77. while (iter.hasNext()) {

  78. String mapk = iter.next();

  79. String bVal = mapB.containsKey(mapk) ? mapB.get(mapk) : "0";

  80. result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(bVal);

  81. }

  82. context.write(key, new IntWritable(result));

  83. System.out.println();

  84. // System.out.println("C:" + key.toString() + "," + result);

  85. }

  86. }

  87. public static void run(Map>String, String< path) throws IOException, InterruptedException, ClassNotFoundException {

  88. JobConf conf = MainRun.config();

  89. String input = path.get("input");

  90. String input1 = path.get("input1");

  91. String input2 = path.get("input2");

  92. String output = path.get("output");

  93. HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);

  94. hdfs.rmr(input);

  95. hdfs.mkdirs(input);

  96. hdfs.copyFile(path.get("m1"), input1);

  97. hdfs.copyFile(path.get("m2"), input2);

  98. Job job = new Job(conf);

  99. job.setJarByClass(MartrixMultiply.class);

  100. job.setOutputKeyClass(Text.class);

  101. job.setOutputValueClass(Text.class);

  102. job.setMapperClass(SparseMatrixMapper.class);

  103. job.setReducerClass(SparseMatrixReducer.class);

  104. job.setInputFormatClass(TextInputFormat.class);

  105. job.setOutputFormatClass(TextOutputFormat.class);

  106. FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 載入2個輸入資料集

  107. FileOutputFormat.setOutputPath(job, new Path(output));

  108. job.waitForCompletion(true);

  109. }

  110. }

執行輸出:

  1. Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix

  2. Create: hdfs://192.168.1.210:9000/user/hdfs/matrix

  3. copy from: logfile/matrix/sm1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1

  4. copy from: logfile/matrix/sm2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2

  5. 2014-1-15 11:57:31 org.apache.hadoop.util.NativeCodeLoader >clinit<

  6. 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

  7. 2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles

  8. 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

  9. 2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles

  10. 警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

  11. 2014-1-15 11:57:31 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus

  12. 資訊: Total input paths to process : 2

  13. 2014-1-15 11:57:31 org.apache.hadoop.io.compress.snappy.LoadSnappy >clinit<

  14. 警告: Snappy native library not loaded

  15. 2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  16. 資訊: Running job: job_local_0001

  17. 2014-1-15 11:57:31 org.apache.hadoop.mapred.Task initialize

  18. 資訊: Using ResourceCalculatorPlugin : null

  19. 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<

  20. 資訊: io.sort.mb = 100

  21. 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<

  22. 資訊: data buffer = 79691776/99614720

  23. 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<

  24. 資訊: record buffer = 262144/327680

  25. 1,1 A:1,1

  26. 1,2 A:1,1

  27. 1,1 A:4,3

  28. 1,2 A:4,3

  29. 2,1 A:1,2

  30. 2,2 A:1,2

  31. 2,1 A:2,5

  32. 2,2 A:2,5

  33. 2,1 A:4,4

  34. 2,2 A:4,4

  35. 3,1 A:4,1

  36. 3,2 A:4,1

  37. 4,1 A:1,4

  38. 4,2 A:1,4

  39. 4,1 A:2,7

  40. 4,2 A:2,7

  41. 4,1 A:3,1

  42. 4,2 A:3,1

  43. 4,1 A:4,2

  44. 4,2 A:4,2

  45. 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush

  46. 資訊: Starting flush of map output

  47. 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill

  48. 資訊: Finished spill 0

  49. 2014-1-15 11:57:31 org.apache.hadoop.mapred.Task done

  50. 資訊: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

  51. 2014-1-15 11:57:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  52. 資訊: map 0% reduce 0%

  53. 2014-1-15 11:57:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  54. 資訊:

  55. 2014-1-15 11:57:34 org.apache.hadoop.mapred.Task sendDone

  56. 資訊: Task 'attempt_local_0001_m_000000_0' done.

  57. 2014-1-15 11:57:34 org.apache.hadoop.mapred.Task initialize

  58. 資訊: Using ResourceCalculatorPlugin : null

  59. 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<

  60. 資訊: io.sort.mb = 100

  61. 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<

  62. 資訊: data buffer = 79691776/99614720

  63. 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<

  64. 資訊: record buffer = 262144/327680

  65. 1,1 B:1,5

  66. 2,1 B:1,5

  67. 3,1 B:1,5

  68. 4,1 B:1,5

  69. 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush

  70. 資訊: Starting flush of map output

  71. 1,2 B:2,2

  72. 2,2 B:2,2

  73. 3,2 B:2,2

  74. 4,2 B:2,2

  75. 1,1 B:4,3

  76. 2,1 B:4,3

  77. 3,1 B:4,3

  78. 4,1 B:4,3

  79. 1,2 B:4,1

  80. 2,2 B:4,1

  81. 3,2 B:4,1

  82. 4,2 B:4,1

  83. 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill

  84. 資訊: Finished spill 0

  85. 2014-1-15 11:57:34 org.apache.hadoop.mapred.Task done

  86. 資訊: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting

  87. 2014-1-15 11:57:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  88. 資訊: map 100% reduce 0%

  89. 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  90. 資訊:

  91. 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task sendDone

  92. 資訊: Task 'attempt_local_0001_m_000001_0' done.

  93. 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task initialize

  94. 資訊: Using ResourceCalculatorPlugin : null

  95. 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  96. 資訊:

  97. 2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge

  98. 資訊: Merging 2 sorted segments

  99. 2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge

  100. 資訊: Down to the last merge-pass, with 2 segments left of total size: 436 bytes

  101. 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  102. 資訊:

  103. 1,1:(B:1,5)(B:4,3)(A:1,1)(A:4,3)

  104. 1,2:(A:1,1)(A:4,3)(B:2,2)(B:4,1)

  105. 2,1:(B:1,5)(B:4,3)(A:1,2)(A:2,5)(A:4,4)

  106. 2,2:(A:1,2)(A:2,5)(A:4,4)(B:4,1)(B:2,2)

  107. 3,1:(B:1,5)(B:4,3)(A:4,1)

  108. 3,2:(A:4,1)(B:2,2)(B:4,1)

  109. 4,1:(B:4,3)(B:1,5)(A:1,4)(A:2,7)(A:3,1)(A:4,2)

  110. 4,2:(A:1,4)(A:2,7)(A:3,1)(A:4,2)(B:2,2)(B:4,1)

  111. 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task done

  112. 資訊: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

  113. 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  114. 資訊:

  115. 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task commit

  116. 資訊: Task attempt_local_0001_r_000000_0 is allowed to commit now

  117. 2014-1-15 11:57:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask

  118. 資訊: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output

  119. 2014-1-15 11:57:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate

  120. 資訊: reduce < reduce

  121. 2014-1-15 11:57:40 org.apache.hadoop.mapred.Task sendDone

  122. 資訊: Task 'attempt_local_0001_r_000000_0' done.

  123. 2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  124. 資訊: map 100% reduce 100%

  125. 2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob

  126. 資訊: Job complete: job_local_0001

  127. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  128. 資訊: Counters: 19

  129. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  130. 資訊: File Output Format Counters

  131. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  132. 資訊: Bytes Written=53

  133. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  134. 資訊: FileSystemCounters

  135. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  136. 資訊: FILE_BYTES_READ=2503

  137. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  138. 資訊: HDFS_BYTES_READ=266

  139. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  140. 資訊: FILE_BYTES_WRITTEN=126274

  141. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  142. 資訊: HDFS_BYTES_WRITTEN=347

  143. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  144. 資訊: File Input Format Counters

  145. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  146. 資訊: Bytes Read=98

  147. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  148. 資訊: Map-Reduce Framework

  149. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  150. 資訊: Map output materialized bytes=444

  151. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  152. 資訊: Map input records=14

  153. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  154. 資訊: Reduce shuffle bytes=0

  155. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  156. 資訊: Spilled Records=72

  157. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  158. 資訊: Map output bytes=360

  159. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  160. 資訊: Total committed heap usage (bytes)=764215296

  161. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  162. 資訊: SPLIT_RAW_BYTES=220

  163. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  164. 資訊: Combine input records=0

  165. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  166. 資訊: Reduce input records=36

  167. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  168. 資訊: Reduce input groups=8

  169. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  170. 資訊: Combine output records=0

  171. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  172. 資訊: Reduce output records=8

  173. 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log

  174. 資訊: Map output records=36

這樣就用MapReduce的程式,實現了矩陣的乘法!有了矩陣計算的基礎,接下來,我們就可以做更多的事情了!