1. 程式人生 > >2018-08-05 期 MapReduce實現每個單詞在每個文件中坐標信息統計

2018-08-05 期 MapReduce實現每個單詞在每個文件中坐標信息統計

line 字符 count throws ase protect clas 行處理 tostring

package cn.sjq.bigdata.inverted.index;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* 實現功能: MapReduce實現每個單詞在每個文件中坐標信息統計

* 輸入文件:a.txt b.txt c.txt d.txt e.txt

* 輸出結果:

* 利用MapReduce實現每個單詞在每個文件中出現行列坐標信息,比如hello在a.txt、b.txt、c.txt中的坐標位置

* word (file1<row,col> file2<row,col>... filen<row,col>)

* hello (a.txt<1,1><2,3><4,8>) (b.txt<2,1><3,5>) (c.txt<2,3><4,9>)

* 輸出描述:

* (a.txt<1,1><2,3><4,8>)表示hello在a.txt文件中第1行第一列、第2行第3列、第4行第8列出現過

* (b.txt<2,1><3,5>) 表示hello在b.txt文件中第2行第1列、第3行第5列出現過

** 實現思路:

* 1、Mapper階段

* 讀入a.txt第1行數據:

* a.txt -->Hello word

* 計算坐標信息

* Hello:a.txt--><1,1>

* word :a.txt--><1,7>

*

* 讀入c.txt第1行數據:

* c.txt -->Hello Java

* 計算坐標信息

* Hello:c.txt--><1,1>

* Java :c.txt--><1,7>

*

* 讀入a.txt第3行數據

* a.txt-->Hello Python Java

* 計算坐標信息

* Hello:a.txt--><3,1>

* Python:a.txt--><3,7>

* Java:a.txt--><3,7>

*

* 通過分析發現如下規律:

* Hello:a.txt--><1,1>

* Hello:c.txt--><1,1>

* Hello:a.txt--><3,1>

*

* Java :c.txt--><1,7>

* Java :a.txt--><3,7>

*

* word :a.txt--><1,7>

*

* Python:a.txt--><3,7>

*

* 通過Mapper處理後,輸出的<k2,v2>格式如下

* <k2> <v2>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* Java :c.txt <1,7>

* Java :a.txt <3,7>

* Python:a.txt <3,4>

* word :a.txt <1,7>

*

* 2、Combiner階段,對Mapper階段輸出數據進行部分合並處理

* Mapper階段輸出數據格式為

* <k2> <v2>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* Combiner處理邏輯

* 對輸入的key進行切分操作

* 比如輸入:Hello:a.txt <1,1>

* 通過Hello:a.txt後可以得出Combiner階段的輸出數據格式

* <k3`> <v3`>

* Hello a.txt--><1,1><3,1>

* Hello c.txt--><1,1>

*

* 3、Reducer階段,Reducer階段主要處理來自Combiner階段輸出,Combiner階段輸出數據格式如下

* <k3> <v3>

* Hello a.txt--><1,1><3,1>

* Hello c.txt--><1,1>

* 通過Reducer處理後,最終輸出數據格式為

*

* <k4> <v4>

* Hello (a.txt<1,1><3,1>),(c.txt<1,1>)

*

* 這樣就實現了每個單詞在每個文件中坐標信息統計。

*

*

* @author songjq

*

*/

public class InvertedIndexCaseTwo {

/**

* Mapper階段

* k1:讀入數據便宜量 LongWritable

* v1:讀入一行數據 Text

* k2:輸出到Combiner階段key Text

* v2:輸出到Combiner階段value Text

* Mapper階段主要實現對輸入數據的分詞計算單詞行列坐標處理,處理後輸出數據格式為:

* <k1> <v1>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* @author songjq

*

*/

static class InvertedIndexCaseTwoMapper extends Mapper<LongWritable, Text, Text, Text> {

//定義單詞所在文件行號,rownum對輸入的當前文件有效,如果重新輸入另外一個文件,則會自動清零

private long rownum = 0;

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

rownum++;

//讀入一行數據

String line = v1.toString();

//獲取讀入文件名稱

FileSplit inputSplit = (FileSplit) context.getInputSplit();

String fileName = inputSplit.getPath().getName();

//分詞

String[] words = line.split(" ");

//遍歷每一行單詞,並獲取該單詞行號和列號,將其傳輸到Combiner

for(String word:words) {

int word_col = line.indexOf(word)+1;

String outKey = word+":"+fileName;

String outValue = "<"+rownum+","+word_col+">";

tkey.set(outKey);

tvalue.set(outValue);

context.write(tkey, tvalue);

}

}

}

/**

* Combiner是一種特殊的Reducer,因此同樣需要繼承Reducer類,使用Combiner不能改變原有業務邏輯及傳輸數據類型,慎用。

* 這裏使用Combiner來對Mapper階段輸入的數據進行部分合並處理

* Mapper輸出的數據格式

* <k2> <v2>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* 通過Combiner處理後,輸出數據格式

* <k3> <v3>

* Hello a.txt<1,1><3,1>

* Hello c.txt<1,1>

* @author songjq

*

*/

static class InvertedIndexCaseTwoCombiner extends Reducer<Text, Text, Text, Text> {

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void reduce(Text k3_, Iterable<Text> v3_, Context ctx) throws IOException, InterruptedException {

//定義存放Combiner輸出的數據

StringBuffer v3str_ = new StringBuffer();

Iterator<Text> iterator = v3_.iterator();

//輸出tvalue處理

while(iterator.hasNext()) {

Text row_col = iterator.next();

v3str_.append(row_col.toString());

}

String key3 = k3_.toString();

String[] split = key3.split(":");

String fileName = split[1];

tvalue.set(fileName+v3str_.toString());

//tkey處理

String word = split[0];

tkey.set(word);

//通過ctx將tkey tvalue傳輸到Reducer端

ctx.write(tkey, tvalue);

}

}

/**

* Reducer端

* Reducer端主要對Combiner端輸出數據進行處理

* Combiner端輸出數據格式為:

* <k3> <v3>

* Hello a.txt--><1,1><3,1>

* Hello c.txt--><1,1>

* 通過Reducer處理後,輸出數據格式為

* <k4> <v4>

* hello a.txt--><1,1><3,1>,c.txt--><1,1>

* @author songjq

*

*/

static class InvertedIndexCaseTwoReducer extends Reducer<Text, Text, Text, Text> {

@Override

protected void reduce(Text k3, Iterable<Text> v3, Context ctx) throws IOException, InterruptedException {

StringBuffer result = new StringBuffer();

for(Text value:v3) {

result.append(value.toString()).append(" ");

}

ctx.write(new Text(formatStr(k3.toString(), 10)), new Text(result.toString()));

}

//輸出文件頭信息

@Override

protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {

context.write(new Text(formatStr("單詞", 10)), new Text("文件行列信息(file<row,col>"));

}

/**

* 字符串填充空格

* @param str

* @param length

* @return

*/

public static String formatStr(String str, int length) {

if (str == null) {

str = "";

}

int strLen = str.getBytes().length;

if (strLen == length) {

return str;

} else if (strLen < length) {

int temp = length - strLen;

String tem = "";

for (int i = 0; i < temp; i++) {

tem = tem + " ";

}

return str + tem;

} else {

return str.substring(0, length);

}

}

}

/**

* 提交Job到hadoop集群執行

* @throws IOException

* @throws ClassNotFoundException

* @throws InterruptedException

*/

@Test

public void InvertedIndexCaseTwoJob() throws IOException, ClassNotFoundException, InterruptedException {

Job job = Job.getInstance(new Configuration());

job.setJarByClass(InvertedIndexCaseTwo.class);

job.setMapperClass(InvertedIndexCaseTwoMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

//設置combiner類

job.setCombinerClass(InvertedIndexCaseTwoCombiner.class);

job.setReducerClass(InvertedIndexCaseTwoReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\InvertedIndex\\srcdata"));

FileOutputFormat.setOutputPath(job, new Path("D:\\test\\InvertedIndex\\output5"));

job.waitForCompletion(true);

}

}

執行結果:

單詞 文件行列信息(file<row,col>

Are d.txt<1,1>

China b.txt<2,1>

Do e.txt<2,1>

Hello a.txt<3,1><1,1> c.txt<1,1>

I e.txt<1,1> b.txt<1,1>

Java c.txt<2,1><1,7> a.txt<3,14>

Python a.txt<3,7>

We a.txt<2,1>

You d.txt<2,1>

a c.txt<2,2>

are d.txt<2,5> a.txt<2,4>

boys d.txt<2,14>

china e.txt<2,16><1,13>

come e.txt<1,3>

country b.txt<2,23><1,11>

friend a.txt<2,13>

from e.txt<1,8>

good a.txt<2,8> c.txt<2,11> d.txt<2,9>

greatest b.txt<2,14>

in b.txt<2,3>

is b.txt<2,7> c.txt<2,6>

language c.txt<2,16>

love b.txt<1,3>

my b.txt<1,8>

ok d.txt<1,9>

the b.txt<2,10><2,10>

to e.txt<2,13>

want e.txt<2,8>

word a.txt<1,7>

world b.txt<2,38>

you e.txt<2,4> d.txt<1,5>


2018-08-05 期 MapReduce實現每個單詞在每個文件中坐標信息統計