1. 程式人生 > >2018-08-03 期 MapReduce倒排索引編程案例1(Combiner方式)

2018-08-03 期 MapReduce倒排索引編程案例1(Combiner方式)

pre true 輸出 hello pub 類型 rom 偏移 apr

package cn.sjq.bigdata.inverted.index;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* 利用MapReduce實現輸入多個文件中單詞在每個文件中出現的次數,輸出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 實現方法:采用倒排索引算法並結合MapReduce Combiner實現

* 中間添加Combiner需要註意不能改變原有實現邏輯及改變Mapper到Reducer的數據類型

*

* 本案例中所有的Mapper、Reducer、Job均采用匿名內部類實現

* @author songjq

*

*/

public class InvertedIndexCaseOne {

/**

* Mapper階段

* k1:輸入key LongWritable 讀入數據偏移量

* v1:輸入value Text 讀入的一行數據

* k2:輸出key Text 格式為<hello:a.txt>,<hello:b.txt>

* v2:輸出value Text 格式為<1>,<1>

* @author songjq

*

*/

static class InvertedIndexCaseOneMapper extends Mapper<LongWritable, Text, Text, Text> {

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//讀入數據

String line = v1.toString();

//分詞,安裝空格切分

String[] words = line.split(" ");

//獲取輸入文件名稱

FileSplit inputSplit = (FileSplit) context.getInputSplit();

String fileName = inputSplit.getPath().getName();

//將數據通過context傳輸到Reducer

for(String word:words) {

tkey.set(word+":"+fileName);

tvalue.set("1");

context.write(tkey, tvalue);

}

}

}

/**

* Combiner階段

* 定義Combiner類

* 由於Combiner是一個特殊的Reducer,因此需要繼承Reducer

* 其作用就是對Mapper端輸入的數據進行部分求和,並發送到Reducer階段處理

* Mapper端輸入的數據格式如下:

* <k2> <v2>

* <hello:a.txt <"1","1">

* <hello:b.txt <"1">

* 通過Combiner處理後,最終輸出到Reducer的數據格式如下

* <k3> <v3>

* <hello> <a.txt:"2">

* <hello> <b.txt:"1">

* @author songjq

*

*/

static class InvertedIndexCaseOneCombiner extends Reducer<Text, Text, Text, Text> {

@Override

protected void reduce(Text k31, Iterable<Text> v31, Context ctx) throws IOException, InterruptedException {

int total = 0;

for(Text val:v31) {

//單詞在每個文件中出現次數統計

total+=Integer.parseInt(val.toString());

}

//k3處理,格式hello:a.txt

String[] split = k31.toString().split(":");

String word = split[0];

String fileName = split[1];

//輸出 k3:<hello> v3:<a.txt:"2",b.txt:"1">

ctx.write(new Text(word), new Text(fileName+":"+total));

}

}

/**

* Reducer階段

* Reducer階段主要對Combiner階段輸出的數據進行處理

* Combiner階段輸出數據格式如下:

* <k3> <v3>

* <hello> <a.txt:"2",b.txt:"1">

* 通過Reducer處理後,最終輸出數據格式如下:

* <k4> <v4>

* <hello> <(a.txt 2,b.txt 1)>

* @author songjq

*

*/

static class InvertedIndexCaseOneReducer extends Reducer<Text, Text, Text, Text> {

/*

* 由於setup方法只會被調用一次,因此可以在這裏輸出文件頭

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {

context.write(new Text(formatStr("Word", 20)), new Text("Frequency statistics [eg:(a.txt 2,b.txt 1)]"));

}

@Override

protected void reduce(Text k3, Iterable<Text> v3, Context ctx) throws IOException, InterruptedException {

//定義存放輸出結果的對象result

StringBuffer result = new StringBuffer();

for(Text val:v3) {

//<v3>數據<a.txt:"2">

String[] split = val.toString().split(":");

String fileName = split[0];

String count = split[1];

result.append(fileName).append(" ").append(count).append(",");

}

//將<k4,v4>寫入HDFS

//最終輸出到文件的數據格式 hello (a.txt 2,b.txt 1)

ctx.write(new Text(formatStr(k3.toString(), 20)), new Text(result.deleteCharAt(result.length()-1).toString()));

}

/**

* 字符串填充空格

* @param str

* @param length

* @return

*/

public static String formatStr(String str, int length) {

if (str == null) {

str = "";

}

int strLen = str.getBytes().length;

if (strLen == length) {

return str;

} else if (strLen < length) {

int temp = length - strLen;

String tem = "";

for (int i = 0; i < temp; i++) {

tem = tem + " ";

}

return str + tem;

} else {

return str.substring(0, length);

}

}

}

/**

* 提交job

* @throws IOException

* @throws InterruptedException

* @throws ClassNotFoundException

*

*/

@Test

public void InvertedIndexCaseOneJob() throws IOException, ClassNotFoundException, InterruptedException {

Job job = Job.getInstance(new Configuration());

job.setJarByClass(InvertedIndexCaseOne.class);

job.setMapperClass(InvertedIndexCaseOneMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

// 設置Combiner Class類

job.setCombinerClass(InvertedIndexCaseOneCombiner.class);

job.setReducerClass(InvertedIndexCaseOneReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\InvertedIndex\\srcdata"));

FileOutputFormat.setOutputPath(job, new Path("D:\\test\\InvertedIndex\\output2"));

job.waitForCompletion(true);

}

}

直接結果

Word Frequency statistics [eg:(a.txt 2,b.txt 1)]

Are d.txt 1

China b.txt 1

Do e.txt 1

Hello c.txt 1,a.txt 1

I e.txt 1,b.txt 1

Java c.txt 2

We a.txt 1

You d.txt 1

a c.txt 1

are d.txt 1,a.txt 1

boys d.txt 1

china e.txt 2

come e.txt 1

country b.txt 2

friend a.txt 1

from e.txt 1

good d.txt 1,c.txt 1,a.txt 1

greatest b.txt 1

in b.txt 1

is b.txt 1,c.txt 1

language c.txt 1

love b.txt 1

my b.txt 1

ok d.txt 1

the b.txt 2

to e.txt 1

want e.txt 1

word a.txt 1

world b.txt 1

you d.txt 1,e.txt 1


2018-08-03 期 MapReduce倒排索引編程案例1(Combiner方式)