2018-08-05 期 MapReduce實現每個單詞在每個文件中坐標信息統計
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* 實現功能: MapReduce實現每個單詞在每個文件中坐標信息統計
* 輸入文件:a.txt b.txt c.txt d.txt e.txt
* 輸出結果:
* 利用MapReduce實現每個單詞在每個文件中出現行列坐標信息,比如hello在a.txt、b.txt、c.txt中的坐標位置
* word (file1<row,col> file2<row,col>... filen<row,col>)
* hello (a.txt<1,1><2,3><4,8>) (b.txt<2,1><3,5>) (c.txt<2,3><4,9>)
* 輸出描述:
* (a.txt<1,1><2,3><4,8>)表示hello在a.txt文件中第1行第一列、第2行第3列、第4行第8列出現過
* (b.txt<2,1><3,5>) 表示hello在b.txt文件中第2行第1列、第3行第5列出現過
** 實現思路:
* 1、Mapper階段
* 讀入a.txt第1行數據:
* a.txt -->Hello word
* 計算坐標信息
* Hello:a.txt--><1,1>
* word :a.txt--><1,7>
*
* 讀入c.txt第1行數據:
* c.txt -->Hello Java
* 計算坐標信息
* Hello:c.txt--><1,1>
* Java :c.txt--><1,7>
*
* 讀入a.txt第3行數據
* a.txt-->Hello Python Java
* 計算坐標信息
* Hello:a.txt--><3,1>
* Python:a.txt--><3,7>
* Java:a.txt--><3,7>
*
* 通過分析發現如下規律:
* Hello:a.txt--><1,1>
* Hello:c.txt--><1,1>
* Hello:a.txt--><3,1>
*
* Java :c.txt--><1,7>
* Java :a.txt--><3,7>
*
* word :a.txt--><1,7>
*
* Python:a.txt--><3,7>
*
* 通過Mapper處理後,輸出的<k2,v2>格式如下
* <k2> <v2>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* Java :c.txt <1,7>
* Java :a.txt <3,7>
* Python:a.txt <3,4>
* word :a.txt <1,7>
*
* 2、Combiner階段,對Mapper階段輸出數據進行部分合並處理
* Mapper階段輸出數據格式為
* <k2> <v2>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* Combiner處理邏輯
* 對輸入的key進行切分操作
* 比如輸入:Hello:a.txt <1,1>
* 通過Hello:a.txt後可以得出Combiner階段的輸出數據格式
* <k3`> <v3`>
* Hello a.txt--><1,1><3,1>
* Hello c.txt--><1,1>
*
* 3、Reducer階段,Reducer階段主要處理來自Combiner階段輸出,Combiner階段輸出數據格式如下
* <k3> <v3>
* Hello a.txt--><1,1><3,1>
* Hello c.txt--><1,1>
* 通過Reducer處理後,最終輸出數據格式為
*
* <k4> <v4>
* Hello (a.txt<1,1><3,1>),(c.txt<1,1>)
*
* 這樣就實現了每個單詞在每個文件中坐標信息統計。
*
*
* @author songjq
*
*/
public class InvertedIndexCaseTwo {
/**
* Mapper階段
* k1:讀入數據便宜量 LongWritable
* v1:讀入一行數據 Text
* k2:輸出到Combiner階段key Text
* v2:輸出到Combiner階段value Text
* Mapper階段主要實現對輸入數據的分詞計算單詞行列坐標處理,處理後輸出數據格式為:
* <k1> <v1>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* @author songjq
*
*/
static class InvertedIndexCaseTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
//定義單詞所在文件行號,rownum對輸入的當前文件有效,如果重新輸入另外一個文件,則會自動清零
private long rownum = 0;
private Text tkey = new Text();
private Text tvalue = new Text();
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
rownum++;
//讀入一行數據
String line = v1.toString();
//獲取讀入文件名稱
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();
//分詞
String[] words = line.split(" ");
//遍歷每一行單詞,並獲取該單詞行號和列號,將其傳輸到Combiner
for(String word:words) {
int word_col = line.indexOf(word)+1;
String outKey = word+":"+fileName;
String outValue = "<"+rownum+","+word_col+">";
tkey.set(outKey);
tvalue.set(outValue);
context.write(tkey, tvalue);
}
}
}
/**
* Combiner是一種特殊的Reducer,因此同樣需要繼承Reducer類,使用Combiner不能改變原有業務邏輯及傳輸數據類型,慎用。
* 這裏使用Combiner來對Mapper階段輸入的數據進行部分合並處理
* Mapper輸出的數據格式
* <k2> <v2>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* 通過Combiner處理後,輸出數據格式
* <k3> <v3>
* Hello a.txt<1,1><3,1>
* Hello c.txt<1,1>
* @author songjq
*
*/
static class InvertedIndexCaseTwoCombiner extends Reducer<Text, Text, Text, Text> {
private Text tkey = new Text();
private Text tvalue = new Text();
@Override
protected void reduce(Text k3_, Iterable<Text> v3_, Context ctx) throws IOException, InterruptedException {
//定義存放Combiner輸出的數據
StringBuffer v3str_ = new StringBuffer();
Iterator<Text> iterator = v3_.iterator();
//輸出tvalue處理
while(iterator.hasNext()) {
Text row_col = iterator.next();
v3str_.append(row_col.toString());
}
String key3 = k3_.toString();
String[] split = key3.split(":");
String fileName = split[1];
tvalue.set(fileName+v3str_.toString());
//tkey處理
String word = split[0];
tkey.set(word);
//通過ctx將tkey tvalue傳輸到Reducer端
ctx.write(tkey, tvalue);
}
}
/**
* Reducer端
* Reducer端主要對Combiner端輸出數據進行處理
* Combiner端輸出數據格式為:
* <k3> <v3>
* Hello a.txt--><1,1><3,1>
* Hello c.txt--><1,1>
* 通過Reducer處理後,輸出數據格式為
* <k4> <v4>
* hello a.txt--><1,1><3,1>,c.txt--><1,1>
* @author songjq
*
*/
static class InvertedIndexCaseTwoReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k3, Iterable<Text> v3, Context ctx) throws IOException, InterruptedException {
StringBuffer result = new StringBuffer();
for(Text value:v3) {
result.append(value.toString()).append(" ");
}
ctx.write(new Text(formatStr(k3.toString(), 10)), new Text(result.toString()));
}
//輸出文件頭信息
@Override
protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
context.write(new Text(formatStr("單詞", 10)), new Text("文件行列信息(file<row,col>"));
}
/**
* 字符串填充空格
* @param str
* @param length
* @return
*/
public static String formatStr(String str, int length) {
if (str == null) {
str = "";
}
int strLen = str.getBytes().length;
if (strLen == length) {
return str;
} else if (strLen < length) {
int temp = length - strLen;
String tem = "";
for (int i = 0; i < temp; i++) {
tem = tem + " ";
}
return str + tem;
} else {
return str.substring(0, length);
}
}
}
/**
* 提交Job到hadoop集群執行
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
@Test
public void InvertedIndexCaseTwoJob() throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(InvertedIndexCaseTwo.class);
job.setMapperClass(InvertedIndexCaseTwoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//設置combiner類
job.setCombinerClass(InvertedIndexCaseTwoCombiner.class);
job.setReducerClass(InvertedIndexCaseTwoReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("D:\\test\\InvertedIndex\\srcdata"));
FileOutputFormat.setOutputPath(job, new Path("D:\\test\\InvertedIndex\\output5"));
job.waitForCompletion(true);
}
}
執行結果:
單詞 文件行列信息(file<row,col>
Are d.txt<1,1>
China b.txt<2,1>
Do e.txt<2,1>
Hello a.txt<3,1><1,1> c.txt<1,1>
I e.txt<1,1> b.txt<1,1>
Java c.txt<2,1><1,7> a.txt<3,14>
Python a.txt<3,7>
We a.txt<2,1>
You d.txt<2,1>
a c.txt<2,2>
are d.txt<2,5> a.txt<2,4>
boys d.txt<2,14>
china e.txt<2,16><1,13>
come e.txt<1,3>
country b.txt<2,23><1,11>
friend a.txt<2,13>
from e.txt<1,8>
good a.txt<2,8> c.txt<2,11> d.txt<2,9>
greatest b.txt<2,14>
in b.txt<2,3>
is b.txt<2,7> c.txt<2,6>
language c.txt<2,16>
love b.txt<1,3>
my b.txt<1,8>
ok d.txt<1,9>
the b.txt<2,10><2,10>
to e.txt<2,13>
want e.txt<2,8>
word a.txt<1,7>
world b.txt<2,38>
you e.txt<2,4> d.txt<1,5>
2018-08-05 期 MapReduce實現每個單詞在每個文件中坐標信息統計