1. 程式人生 > >Hadoop/MapReduce反轉排序:控制規約器Reducer值的順序

Hadoop/MapReduce反轉排序:控制規約器Reducer值的順序

例子:計算一個給定文件集中單詞的相對頻度。目標是建立一個N*N矩陣M,其中N為所有給定文件的單詞量,每個單元Mij包含一個特定上下文單詞Wi與Wj共同出現的次數。為簡單起見,將這個上下文定義為Wi的鄰域。例如:給定以下單詞:W1,W2,W3,W4,W5,W6

如果定義一個單詞的鄰域為這個單詞的前兩個單詞和後兩個單詞,那麼這6個單詞的鄰域如下:

單詞    領域+-2

W1    W2,W3

W2    W1,W3,W4

W3    W1,W2,W4,W5

W4    W2,W3,W5,W6

W5    W3,W4,W6

W6    W4,W5

反轉排序的MapReduce/Hadoop實現:

我們要生成兩個資料序列。第一個序列三單詞的總領域計數(一個單詞的總共出現次數),用組合鍵(W,*)表示,其中W表示該單詞。

第二個序列是這個單詞和其他特定單詞出現次數,用組合鍵(W,W2)表示

Hadoop的定製分割槽器實現

定製分割槽器必須確保含有相同左詞(即自然鍵)的所有詞對要傳送到相同的規約器。例如,組合鍵{(man,tall),(man,strong),(man,moon)...}都要傳送到同一個規約器。要把這些鍵傳送到相同的規約器,必須定義一個定製分割槽器,它只關心左詞,如例中man。

相對頻度對映器

以W1,W2,W3,W4,W5,W6為例:

鍵    值

(W1,W2) 1

(W1,W3) 1

(W1,*) 2

(W2,W1) 1

(W2,W3) 1

(W2,W4) 1

(W2,*) 3

(W3,W1) 1

(W3,W2) 1

(W3,W4) 1

(W3,W5) 1

(W3,*) 4

(W4,W2) 1

(W4,W3) 1

(W4,W5) 1

(W4,W6) 1

(W4,*) 4

(W5,W3) 1

(W5,W4) 1

(W5,W6) 1

(W5,*) 3

(W6,W4) 1

(W6,W6) 1

(W6,*) 1

相對頻度規約器

規約器的輸入:

鍵    值

(W1,*),(W1,W2) ,(W1,W3)    2,1,1

(W2,*),(W2,W1),(W2,W3),(W2,W4) 3,1,1,1

....

程式碼實現:

自定義組合鍵

package fanzhuanpaixu;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

/**
 * Adapted from Cloud9: A Hadoop toolkit for working with big data
 * edu.umd.cloud9.io.pair.PairOfStrings 
 *
 * WritableComparable representing a pair of Strings. 
 * The elements in the pair are referred to as the left (word) 
 * and right (neighbor) elements. The natural sort order is:
 * first by the left element, and then by the right element.
 * 
 * @author Mahmoud Parsian (added few new convenient methods)
 *
 */
public class PairOfWords implements WritableComparable<PairOfWords> {

    private String leftElement;
    private String rightElement;

    /**
     * Creates a pair.
     */
    public PairOfWords() {
    }

    /**
     * Creates a pair.
     *
     * @param left the left element
     * @param right the right element
     */
    public PairOfWords(String left, String right) {
        set(left, right);
    }

    /**
     * Deserializes the pair.
     *
     * @param in source for raw byte representation
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        leftElement = Text.readString(in);
        rightElement = Text.readString(in);
    }

    /**
     * Serializes this pair.
     *
     * @param out where to write the raw byte representation
     */
    @Override
    public void write(DataOutput out) throws IOException {
        Text.writeString(out, leftElement);
        Text.writeString(out, rightElement);
    }

    public void setLeftElement(String leftElement) {
        this.leftElement = leftElement;
    }

    public void setWord(String leftElement) {
        setLeftElement(leftElement);
    }

    /**
     * Returns the left element.
     *
     * @return the left element
     */
    public String getWord() {
        return leftElement;
    }

    /**
     * Returns the left element.
     *
     * @return the left element
     */
    public String getLeftElement() {
        return leftElement;
    }

    public void setRightElement(String rightElement) {
        this.rightElement = rightElement;
    }

    public void setNeighbor(String rightElement) {
        setRightElement(rightElement);
    }

    /**
     * Returns the right element.
     *
     * @return the right element
     */
    public String getRightElement() {
        return rightElement;
    }

    public String getNeighbor() {
        return rightElement;
    }

    /**
     * Returns the key (left element).
     *
     * @return the key
     */
    public String getKey() {
        return leftElement;
    }

    /**
     * Returns the value (right element).
     *
     * @return the value
     */
    public String getValue() {
        return rightElement;
    }

    /**
     * Sets the right and left elements of this pair.
     *
     * @param left the left element
     * @param right the right element
     */
    public void set(String left, String right) {
        leftElement = left;
        rightElement = right;
    }

    /**
     * Checks two pairs for equality.
     *
     * @param obj object for comparison
     * @return <code>true</code> if <code>obj</code> is equal to this object, <code>false</code> otherwise
     */
    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        //
        if (!(obj instanceof PairOfWords)) {
            return false;
        }
        //
        PairOfWords pair = (PairOfWords) obj;
        return leftElement.equals(pair.getLeftElement())
                && rightElement.equals(pair.getRightElement());
    }

    /**
     * Defines a natural sort order for pairs. Pairs are sorted first by the left element, and then by the right
     * element.
     *
     * @return a value less than zero, a value greater than zero, or zero if this pair should be sorted before, sorted
     * after, or is equal to <code>obj</code>.
     */
    @Override
    public int compareTo(PairOfWords pair) {
        String pl = pair.getLeftElement();
        String pr = pair.getRightElement();

        if (leftElement.equals(pl)) {
            return rightElement.compareTo(pr);
        }

        return leftElement.compareTo(pl);
    }

    /**
     * Returns a hash code value for the pair.
     *
     * @return hash code for the pair
     */
    @Override
    public int hashCode() {
        return leftElement.hashCode() + rightElement.hashCode();
    }

    /**
     * Generates human-readable String representation of this pair.
     *
     * @return human-readable String representation of this pair
     */
    @Override
    public String toString() {
        return "(" + leftElement + ", " + rightElement + ")";
    }

    /**
     * Clones this object.
     *
     * @return clone of this object
     */
    @Override
    public PairOfWords clone() {
        return new PairOfWords(this.leftElement, this.rightElement);
    }

    /**
     * Comparator optimized for <code>PairOfWords</code>.
     */
    public static class Comparator extends WritableComparator {

        /**
         * Creates a new Comparator optimized for <code>PairOfWords</code>.
         */
        public Comparator() {
            super(PairOfWords.class);
        }

        /**
         * Optimization hook.
         */
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            try {
                int firstVIntL1 = WritableUtils.decodeVIntSize(b1[s1]);
                int firstVIntL2 = WritableUtils.decodeVIntSize(b2[s2]);
                int firstStrL1 = readVInt(b1, s1);
                int firstStrL2 = readVInt(b2, s2);
                int cmp = compareBytes(b1, s1 + firstVIntL1, firstStrL1, b2, s2 + firstVIntL2, firstStrL2);
                if (cmp != 0) {
                    return cmp;
                }

                int secondVIntL1 = WritableUtils.decodeVIntSize(b1[s1 + firstVIntL1 + firstStrL1]);
                int secondVIntL2 = WritableUtils.decodeVIntSize(b2[s2 + firstVIntL2 + firstStrL2]);
                int secondStrL1 = readVInt(b1, s1 + firstVIntL1 + firstStrL1);
                int secondStrL2 = readVInt(b2, s2 + firstVIntL2 + firstStrL2);
                return compareBytes(b1, s1 + firstVIntL1 + firstStrL1 + secondVIntL1, secondStrL1, b2,
                        s2 + firstVIntL2 + firstStrL2 + secondVIntL2, secondStrL2);
            } catch (IOException e) {
                throw new IllegalArgumentException(e);
            }
        }
    }

    static { // register this comparator
        WritableComparator.define(PairOfWords.class, new Comparator());
    }
}


對映器

package fanzhuanpaixu;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//
import org.apache.commons.lang.StringUtils;
//
import java.io.IOException;

/**
 * RelativeFrequencyMapper implements the map() function for Relative Frequency of words.
 *
 * @author Mahmoud Parsian
 *
 */
public class RelativeFrequencyMapper
        extends Mapper<LongWritable, Text, PairOfWords, IntWritable> {

    private int neighborWindow = 2;
    // pair = (leftElement, rightElement)
    private final PairOfWords pair = new PairOfWords();
    private final IntWritable totalCount = new IntWritable();
    private static final IntWritable ONE = new IntWritable(1);

    @Override
    public void setup(Context context) {
        this.neighborWindow = context.getConfiguration().getInt("neighbor.window", 2);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String[] tokens = StringUtils.split(value.toString(), " ");
        //String[] tokens = StringUtils.split(value.toString(), "\\s+");
        if ((tokens == null) || (tokens.length < 2)) {
            return;
        }

        for (int i = 0; i < tokens.length; i++) {
            tokens[i] = tokens[i].replaceAll("\\W+", "");

            if (tokens[i].equals("")) {
                continue;
            }

            pair.setWord(tokens[i]);

            int start = (i - neighborWindow < 0) ? 0 : i - neighborWindow;
            int end = (i + neighborWindow >= tokens.length) ? tokens.length - 1 : i + neighborWindow;
            for (int j = start; j <= end; j++) {
                if (j == i) {
                    continue;
                }
                pair.setNeighbor(tokens[j].replaceAll("\\W", ""));
                context.write(pair, ONE);
            }
            //
            pair.setNeighbor("*");
            totalCount.set(end - start);
            context.write(pair, totalCount);
        }
    }
}


分割槽器

package fanzhuanpaixu;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/** 
 * This is an plug-in class.
 * The OrderInversionPartitioner class indicates how to partition data
 * based on the word only (the natural key).
 * 
 * @author Mahmoud Parsian
 *
 */
public class OrderInversionPartitioner 
   extends Partitioner<PairOfWords, IntWritable> {

    @Override
    public int getPartition(PairOfWords key, 
                            IntWritable value, 
                            int numberOfPartitions) {
        // key = (leftWord, rightWord) = (word, neighbor)
        String leftWord = key.getLeftElement();
        return Math.abs( ((int) hash(leftWord)) % numberOfPartitions);
    }
    
    /**
     * Return a hashCode() of a given String object.   
     * This is adapted from String.hashCode()
     *
     * @param str a string object
     *
     */
    private static long hash(String str) {
       long h = 1125899906842597L; // prime
       int length = str.length();
       for (int i = 0; i < length; i++) {
          h = 31*h + str.charAt(i);
       }
       return h;
    }	
    
}


組合器

package fanzhuanpaixu;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/** 
 * RelativeFrequencyCombiner implements the combine() [in Hadoop, 
 * we use reduce() for implementing the combine() function] function 
 * for MapReduce/Hadoop.
 *
 * @author Mahmoud Parsian
 *
 */
public class RelativeFrequencyCombiner
        extends Reducer<PairOfWords, IntWritable, PairOfWords, IntWritable> {

    @Override
    protected void reduce(PairOfWords key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        //
        int partialSum = 0;
        for (IntWritable value : values) {
            partialSum += value.get();
        }
        //
        context.write(key, new IntWritable(partialSum));
    }
}


規約器

package fanzhuanpaixu;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * RelativeFrequencyReducer implements the reduce() function for Relative Frequency of words.
 *
 * @author Mahmoud Parsian
 *
 */
public class RelativeFrequencyReducer
        extends Reducer<PairOfWords, IntWritable, PairOfWords, DoubleWritable> {

    private double totalCount = 0;
    private final DoubleWritable relativeCount = new DoubleWritable();
    private String currentWord = "NOT_DEFINED";

    @Override
    protected void reduce(PairOfWords key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        if (key.getNeighbor().equals("*")) {
            if (key.getWord().equals(currentWord)) {
                totalCount += totalCount + getTotalCount(values);
            } else {
                currentWord = key.getWord();
                totalCount = getTotalCount(values);
            }
        } else {
            int count = getTotalCount(values);
            relativeCount.set((double) count / totalCount);
            context.write(key, relativeCount);
        }

    }

    private int getTotalCount(Iterable<IntWritable> values) {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        return sum;
    }
}
驅動
package fanzhuanpaixu;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//
import org.apache.log4j.Logger;

/**
 * RelativeFrequencyDriver is driver class for computing relative frequency of words.
 *
 * @author Mahmoud Parsian
 *
 */
public class RelativeFrequencyDriver
        extends Configured implements Tool {

    private static final Logger THE_LOGGER = Logger.getLogger(RelativeFrequencyDriver.class);

    /**
     * Dispatches command-line arguments to the tool by the ToolRunner.
     */
    public static void main(String[] args) throws Exception {
    	args = new String[1];
    	args[0] = "2";
        int status = ToolRunner.run(new RelativeFrequencyDriver(), args);
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        int neighborWindow = Integer.parseInt(args[0]);
        Path inputPath = new Path("input/paper.txt");
        Path outputPath = new Path("output/fanzhuanpaixu");

        Job job = new Job(new Configuration(), "RelativeFrequencyDriver");
        job.setJarByClass(RelativeFrequencyDriver.class);
        job.setJobName("RelativeFrequencyDriver");

        // Delete the output directory if it exists already
        FileSystem.get(getConf()).delete(outputPath, true);

        job.getConfiguration().setInt("neighbor.window", neighborWindow);

        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // (key,value) generated by map()
        job.setMapOutputKeyClass(PairOfWords.class);
        job.setMapOutputValueClass(IntWritable.class);

        // (key,value) generated by reduce()
        job.setOutputKeyClass(PairOfWords.class);
        job.setOutputValueClass(DoubleWritable.class);

        job.setMapperClass(RelativeFrequencyMapper.class);
        job.setReducerClass(RelativeFrequencyReducer.class);
        job.setCombinerClass(RelativeFrequencyCombiner.class);
        job.setPartitionerClass(OrderInversionPartitioner.class);
        job.setNumReduceTasks(3);

        long startTime = System.currentTimeMillis();
        job.waitForCompletion(true);
        THE_LOGGER.info("Job Finished in milliseconds: " + (System.currentTimeMillis() - startTime));
        return 0;
    }

}


執行結果:



相關推薦

Hadoop/MapReduce反轉排序控制規約Reducer順序

例子:計算一個給定文件集中單詞的相對頻度。目標是建立一個N*N矩陣M,其中N為所有給定文件的單詞量,每個單元Mij包含一個特定上下文單詞Wi與Wj共同出現的次數。為簡單起見,將這個上下文定義為Wi的鄰域。例如:給定以下單詞:W1,W2,W3,W4,W5,W6 如果定義一個單

零基礎讀懂視頻播放控制原理 ffplay 播放源代碼分析

5.4 編碼方式 是否播放 都對 enum 其中 mat 源碼 開始 https://www.qcloud.com/community/article/535574001486630869 視頻播放器原理其實大抵相同,都是對音視頻幀序列的控制。只是一些播放器在音視頻同步上可

Hadoop學習之路(十九)MapReduce框架排序

ati ioe extends 一個用戶 必須 idt 構造 sta gpo 流量統計項目案例 樣本示例 需求 1、 統計每一個用戶(手機號)所耗費的總上行流量、總下行流量,總流量 2、 得出上題結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序 3

Hadoop之手寫原生態MapReduce排序

測試資料: 2030 59 1976 68 2030 19 1997 5 年與溫度的文字,資料可以用java程式碼生成。 生成10000條資料程式碼: public void makeData() throws IOException { Fil

DIM-00014 無法開啟windows nt服務控制管理

在windows上安裝oracle 11g時,dbca圖形化介面出來後,前面步驟已配置好,最後安裝時出現了 DIM-00014: 無法開啟windows nt服務控制管理器      這個錯誤其實是和許可權有關   解決方法

Hadoop-MapReduce計算案例1WordCount

案例描述:計算一個檔案中每個單詞出現的數量 程式碼: package com.jeff.mr.wordCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem;

Hadoop MapReduce二次排序演算法與實現之演算法解析

MapReduce二次排序的原理     1.在Mapper階段,會通過inputFormat的getSplits來把資料集分割成split public abstract class Input

Python Hadoop Mapreduce 實現Hadoop Streaming分組和二次排序

需求:公司給到一份全國各門店銷售資料,要求:1.按門店市場分類,將同一市場的門店放到一起;2.將各家門店按銷售額從大到小,再按利潤從大到小排列 一 需求一:按市場對門店進行分組 分組(partition) Hadoop streaming框架預設情況下會以’/t

ASP.NET Core中的依賴注入(1)控制反轉(IoC)

ASP.NET Core在啟動以及後續針對每個請求的處理過程中的各個環節都需要相應的元件提供相應的服務,為了方便對這些元件進行定製,ASP.NET通過定義介面的方式對它們進行了“標準化”,我們將這些標準化的元件稱為服務,ASP.NET在內部專門維護了一個DI容器來提供所需的服務。要了解這個DI容器以及現實其中

wordcount報錯org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist:

Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://192.168.25.128:9000/export/y

Spring學習(1)控制反轉(IoC)和依賴注入(DI)的詳解以及註解(annotation)開發入門案例

前言 以往的java學習中,我們要想得到一個物件,就把它new出來。如:Apple apple = new Apple(); 在一些複雜的系統中,一個物件A可能依賴於物件B,C等(程式碼表現為A類持有B,C類的物件作為A類的屬性)。以往來說,我們想要使用B,

Hadoop二次排序MapReduce處理流程例項詳解

一、概述 MapReduce框架對處理結果的輸出會根據key值進行預設的排序,這個預設排序可以滿足一部分需求,但是也是十分有限的,在我們實際的需求當中,往往有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網路上已經有很多人分享過了,但是對二次排序的實現原理

hadoop初識之三搭建hadoop環境(配置HDFS,Yarn及mapreduce 執行在yarn)上及三種執行模式(本地模式,偽分散式和分散式介)

--===============安裝jdk(解壓版)================== --root 使用者登入 --建立檔案層級目錄    /opt下分別 建 modules/softwares/datas/tools 資料夾 --檢視是否安裝jdk    rpm -

Hadoop 學習研究(四)MapReduce shuffle過程剖詳解及引數配置調優

MapReduce簡介    在Hadoop  MapReduce中,框架會確保reduce收到的輸入資料是根據key排序過的。資料從Mapper輸出到Reducer接收,是一個很複雜的過程,框架

Hadoop Mapreduce分割槽、分組、二次排序過程詳解[轉]

徐海蛟 教學用途 1、MapReduce中資料流動    (1)最簡單的過程:  map - reduce    (2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce    (3)增加了在本地先進性一次reduce(優化)過程: 

Spark超越Hadoop MapReduce

和 Hadoop 一樣,Spark 提供了一個 Map/Reduce API(分散式計算)和分散式儲存。二者主要的不同點是,Spark 在叢集的記憶體中儲存資料,而 Hadoop 在叢集的磁碟中儲存資料。 大資料對一些資料科學團隊來說是 主要的挑戰,因為在要求的可擴充套件性方面單機沒有能

Hadoop MapReduce開發--升序排序資料,且資料不去重

測試資料: file1.txt 2 32 654 32 15 756 65223 file2.txt 5956 22 650 92 file3.txt 26 54 6 Mapper程式碼: import org.apache.

spring的兩大核心技術之一控制反轉

Spring框架的核心就是控制反轉(Inversion of Control)和依賴注入(Dependency Injection),通過這兩方面來實現鬆耦合。    使用IoC,物件是被動的接受依賴類,而不是自己主動的去找。容器在例項化的時候主動將它的依賴類注入給它。可以這

IOS學習 iPad控制元件POP控制元件,分割視窗、浮動視窗、模態檢視的使用

@implementation AppDelegate - (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptio

Hadoop Mapreduce分割槽、分組、連線以及輔助排序(也叫二次排序)過程詳解

package com.hadoop; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import or