1. 程式人生 > >使用MapReduce實現pairs演算法實現單詞的共現矩陣

使用MapReduce實現pairs演算法實現單詞的共現矩陣

詞頻共現矩陣的用途很廣泛,個性化的推薦系統,基於物品的協同過濾等等。

什麼叫做共現矩陣

例如: I am a good boy good boy

    I  am  a  good boy

I      1

am       1

a                  1

good                 2

boy              2

就是二個單詞一起出現的次數在一篇文件中。

如何用MapReduce實現這個功能呢

1,我們使用pairs演算法,設定一個視窗,將視窗的第一個元素與視窗後面的元素一次形成一個隊《(I ,am),1》

2,我們需要重寫FileInputFormat將一個檔案作為整體不允許分割key為檔名,value為內容的bytes

3,我們需要自定義key值得型別是一個二個對(word1,word2)是一個key,因此我們需要extends WritableComparable<WordPair> 並實現

equals(比較二個類是否相等)  hashCode(將相同的key值不會因為順序,分到不同的RedUCe上)compareTo 比較二個類的大小 readFiles() writeFileds()序列化

下面是具體的程式碼:

package WordConCurrence;


import java.io.DataInput;


public class WordPair implements WritableComparable<WordPair> {
private String wordA;
private String wordB;


public WordPair() {
}


public WordPair(String wordA, String wordB) {
this.wordA = wordA;
this.wordB = wordB;
}


public String getWordA() {
return this.wordA;
}


public String getWordB() {
return this.wordB;
}


@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(wordA);
out.writeUTF(wordB);
}


@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
wordA = in.readUTF();
wordB = in.readUTF();
}


@Override
public String toString() {
return wordA + "," + wordB;
}


@Override
public int compareTo(WordPair o) {
if (this.equals(o))
return 0;
else
return (wordA + wordB).compareTo(o.getWordA() + o.getWordB());
}


@Override
public boolean equals(Object o) {
// 無序對,不用考慮順序
if (!(o instanceof WordPair))
return false;
WordPair w = (WordPair) o;
if ((this.wordA.equals(w.wordA) && this.wordB.equals(w.wordB))
|| (this.wordB.equals(w.wordA) && this.wordA.equals(w.wordB)))
return true;
return false;
}


@Override
public int hashCode() {
return (wordA.hashCode() + wordB.hashCode()) * 17;
}
}

package WordConCurrence;


import java.io.IOException;


/*
 *重寫FileInputFormat,將檔案不分割,讀入到一個map 
 * */
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// TODO Auto-generated method stub
return false;
}


@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
// return null;
return new SingleFileNameReader((FileSplit) split, context
.getConfiguration());
}
}

package WordConCurrence;


import java.io.IOException;


public class SingleFileNameReader extends RecordReader<Text, BytesWritable> {


private FileSplit fileSplit;
@SuppressWarnings("unused")
private Configuration conf;
private boolean processed = false;
private Text key = null;
private BytesWritable value = null;
private FSDataInputStream fis = null;


public SingleFileNameReader(FileSplit fileSplit, Configuration conf) {
this.fileSplit = fileSplit;
this.conf = conf;
}


@Override
public void close() throws IOException {
// TODO Auto-generated method stub


}


@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}


@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}


@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return value;
}


/*
* @Override public void initialize(InputSplit arg0, TaskAttemptContext
* arg1) throws IOException, InterruptedException { fileSplit =
* (FileSplit)arg0; Configuration job = arg1.getConfiguration(); Path file =
* fileSplit.getPath(); FileSystem fs = file.getFileSystem(job); fis =
* fs.open(file); }
*/


@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new Text();
}
if (value == null) {
value = new BytesWritable();
}
if (!processed) {
byte[] content = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
System.out.println(file.getName());
key.set(file.getName());
try {
IOUtils.readFully(fis, content, 0, content.length);
value.set(new BytesWritable(content));
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(fis);
}
processed = true;
return true;// return true表示這次inputformat還沒有結束,會有下一對keyvalue產生
}
return false;// return false表示這次inputformat結束了
}


@Override
public void initialize(InputSplit split,
org.apache.hadoop.mapreduce.TaskAttemptContext context)
throws IOException, InterruptedException {
fileSplit = (FileSplit) split;
Configuration job = context.getConfiguration();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(job);
fis = fs.open(file);


}
}

package WordConCurrence;


import java.io.IOException;


/**
 *統計在若干篇文件中兩個英文單詞在一定視窗內同時出現的次數
 * 如何計算二個單詞出現的頻率,使用pairs演算法,該演算法的流程就是:
 * 選擇一個視窗的大小,使用佇列,將佇列的第一個值與後面的值分別成為一個
 *  e,of 1
 * we,on 1 we,said 1 we,should 2 we,stay 1 we,that 1 we,the 2 we,them 1 we,us 1
 * we,which 1 which,Junk 1 which,a 1 which,assures 1 which,food 1 which,is 1
 * which,necessary 1 which,nutritions 1 which,the 1 which,us 1 who,at 1 who,ate
 * 1 who,enjoy 1 who,main 1 who,meal 1 who,midday 1 who,now 1 who,their 1
 * who,traditionally
 */
public class WordConcurrnce {
private static int MAX_WINDOW = 20;// 單詞同現的最大視窗大小
private static String wordRegex = "([a-zA-Z]{1,})";// 僅僅匹配由字母組成的簡單英文單詞
private static Pattern wordPattern = Pattern.compile(wordRegex);// 用於識別英語單詞(帶連字元-)
private static IntWritable one = new IntWritable(1);


public static class WordConcurrenceMapper extends
Mapper<Text, BytesWritable, WordPair, IntWritable> {
private int windowSize;
private Queue<String> windowQueue = new LinkedList<String>();


@Override
protected void setup(Context context) throws IOException,
InterruptedException {
windowSize = Math.min(context.getConfiguration()
.getInt("window", 2), MAX_WINDOW);
}


/**
* 輸入鍵位文件的檔名,值為文件中的內容的位元組形式。

*/
@Override
public void map(Text docName, BytesWritable docContent, Context context)
throws IOException, InterruptedException {
Matcher matcher = wordPattern.matcher(new String(docContent
.getBytes(), "UTF-8"));
while (matcher.find()) {
windowQueue.add(matcher.group());
if (windowQueue.size() >= windowSize) {
// 對於佇列中的元素[q1,q2,q3...qn]發射[(q1,q2),1],[(q1,q3),1],
// ...[(q1,qn),1]出去
Iterator<String> it = windowQueue.iterator();
String w1 = it.next();
while (it.hasNext()) {
String next = it.next();
context.write(new WordPair(w1, next), one);
}
windowQueue.remove();
}
}
while (!(windowQueue.size() <= 1)) {
Iterator<String> it = windowQueue.iterator();
String w1 = it.next();
while (it.hasNext()) {
context.write(new WordPair(w1, it.next()), one);
}
windowQueue.remove();
}
}


}


public static class WordConcurrenceReducer extends
Reducer<WordPair, IntWritable, WordPair, IntWritable> {
@Override
public void reduce(WordPair wordPair, Iterable<IntWritable> frequence,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : frequence) {
sum += val.get();
}
context.write(wordPair, new IntWritable(sum));
}
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Job wordConcurrenceJob = new Job();
wordConcurrenceJob.setJobName("wordConcurrenceJob");
wordConcurrenceJob.setJarByClass(WordConcurrnce.class);
wordConcurrenceJob.getConfiguration().setInt("window",
Integer.parseInt(args[2]));


wordConcurrenceJob.setMapperClass(WordConcurrenceMapper.class);
wordConcurrenceJob.setMapOutputKeyClass(WordPair.class);
wordConcurrenceJob.setMapOutputValueClass(IntWritable.class);


wordConcurrenceJob.setReducerClass(WordConcurrenceReducer.class);
wordConcurrenceJob.setOutputKeyClass(WordPair.class);
wordConcurrenceJob.setOutputValueClass(IntWritable.class);


wordConcurrenceJob.setInputFormatClass(WholeFileInputFormat.class);
wordConcurrenceJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(wordConcurrenceJob, new Path(args[0]));
FileOutputFormat.setOutputPath(wordConcurrenceJob, new Path(args[1]));


wordConcurrenceJob.waitForCompletion(true);
System.out.println("finished!");
}
}