1. 程式人生 > >hadoop第二篇:使用Maven開發Hadoop程式設計進階

hadoop第二篇:使用Maven開發Hadoop程式設計進階

Hadoop第二篇:使用Maven開發Hadoop程式設計進階

如何進行java程式碼開發進行符合自己需求的實踐,先從Wordcount看看它是如何做的,這裡從使用java介面看看一些基本的操作開始。

基礎環境

  • win7
  • JDK1.8
  • maven安裝
  • IDEA
  • Hadoop在centos上部署完成(單機版,請參看前一篇)

java程式碼

IDEA建立一個java專案

pom.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bamboo</groupId>
    <artifactId>hadoop-1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.2</version>
        </dependency>
    </dependencies>

</project>

檔案的建立和讀取

直接本地執行即可

使用mapredace進行資料處理

程式碼重現wordCount的內容,這裡類取名為hadoop2

MapReduce理論

這部分可以跳過直接後面的程式碼,然後再回過頭看看。

MapReduce分成了兩個部分:       1)對映(Mapping)對集合裡的每個目標應用同一個操作。如把大象放進冰箱分幾個步驟,那麼就可以把每個步奏當成一個mapping.       2)化簡(Reducing)遍歷集合中的元素來返回一個綜合的結果。即,每個步奏的輸出結果的過程,這個任務屬於reducing。   你向MapReduce框架提交一個計算作業時 1.它會首先把計算作業拆分成若干個Map任務,然後分配到不同的節點上去執行,每一個Map任務處理輸入資料中的一部分 2.當Map任務完成後,它會經歷Shuffle將多個map結果合併生成一些中間檔案 3.這些中間檔案將會作為Reduce任務的輸入資料,經過Reduce處理之後把結果合併輸出

Shuffle的主要目標就是把前面若干個Map的輸出合併成若干個租(相當於group by)。

Reduce任務的主要目標就是把Shuffle合併後的資料作為輸入資料處理後輸出    MapReduce的偉大之處就在於程式設計人員在不會分散式並行程式設計的情況下,將自己的程式執行在分散式系統上資料參考

特性

mapreduce之mapper、reducer個數

案例講解

在map(v1)->k2函式中,輸入端v1代表的是一行資料,輸出端的k2可以代表是被引用的專利,在一行資料中所有v2和引用的次數。

在reduce函式中,k2還是被引用的專利,而[v2]是一個數據集,這裡是將k2相同的鍵的v2資料合併起來。最後輸出的是自己需要的資料k3代表的是被引用的專利,v3是引用的次數。

以wordcount為例,有如下資料集2份文件 hello word hello hadoop

處理過程如下

1.每個個文件都有一個mapping ·每讀取一行就按空格分割字串為單詞,則每個單詞作為key,值預設為1 2.reduce處理: ·把所有的mapping處理結果合併,如單詞hello的結果是 hello,[1,1] 因為出現了兩次hello,這個過程由框架自己完成 我們不用處理 .按照key遍歷所有的vlue集合需要手工處理,這裡把所有的1相加就是該單詞出現的個數

3.把結果集合輸出出來

程式碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;


/**
 * MapReduce作業:功能和wordcount一樣,在執行jar時設定檔案輸源和結果輸出源
 *
 * 補充:
 * 設定命令列引數變數來程式設計
   這裡需要藉助Hadoop中的一個類Configured、一個介面Tool、ToolRunner(主要用來執行Tool的子類也就是run方法)

 * <p>原始碼出自https://blog.csdn.net/yinbucheng/article/details/70243593</p>
 * 拷貝人
 */


public class Hadoop2 {

    //輸入資源對映處理
    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{

        private Text event = new Text();//字串
        private final static IntWritable one = new IntWritable(1);//預設字串的個數為1
        //字串的分隔
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            int idx = value.toString().indexOf(" ");//空格的位置
            if (idx > 0) {
                String e = value.toString().substring(0, idx);//
                event.set(e);
                context.write(event, one);//反正結果<e,1>
            }
        }
    }

    //對輸出結果集處理
    public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        //把多個map中返回的<e,1>合併<key,values>即<e,[1,1,1]>後遍歷
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {//遍歷相同key的所有結果集,這裡儲存的是int計算結果和
                sum += val.get();
            }
            result.set(sum);//把結果放入int
            context.write(key, result);//注意:在map或reduce上面的列印語句是沒有辦法輸出的,但會記錄到日誌檔案當中。
        }
    }

    /**
     *  args 在執行jar時設定引數
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        //如果args的引數長度小於2直接退出程式
        if (otherArgs.length < 2) {
            System.err.println("Usage: EventCount <in> <out>");
            System.exit(2);
        }

        Job job = Job.getInstance(conf, "event count");//構建作業配置job

        //設定該作業所要執行的類
        job.setJarByClass(Hadoop2.class);

        job.setMapperClass(MyMapper.class);//設定map
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setCombinerClass(MyReducer.class);//設定

        //設定自定義的Reducer類以及輸出時的型別
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//設定檔案輸入源的路徑
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//設定結果輸出流的路徑


        System.exit(job.waitForCompletion(true) ? 0 : 1);//提交作業完成後退出
    }
}

執行程式碼

java專案打包

mvn package

把包上傳到centos上

/mnt/hadoop

建立手工資料來源

cd /mnt/hadoop/input/event
[[email protected] input]# touch evenet.log.1
[[email protected] input]# touch evenet.log.2

依次vi開啟貼上如下內容

JOB_NEW ...  
JOB_NEW ...  
JOB_FINISH ...  
JOB_NEW ...  
JOB_FINISH ...  

檢視資料來源,清除不需要的資料來源,清空資料輸出路徑

hdfs dfs -ls /mnt/input
hdfs dfs -rm  /mnt/input/evenet.log*
hadoop fs -rmr  /mnt/out

資料來源放入dfs

hdfs dfs -ls /mnt/input
hadoop fs -mkdir -p /mnt/input
hdfs dfs -put /mnt/hadoop/input/event/*   /mnt/input
hadoop fs -ls /mnt/input/input

執行程式碼

hadoop jar hadoop-1-1.0-SNAPSHOT.jar Hadoop2 /mnt/input /mnt/out

等待它執行結束,檢視結果

hadoop dfs -cat /mnt/out/*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

JOB_FINISH	4
JOB_NEW	6

MapReduce深入學習進階樣例

從上面的樣例中我們瞭解了程式碼處理資料的大致過程,那麼如何實現更多業務場景的資料處理呢,畢竟我們不是隻做單表資料的個數統計,現實中可能會有更加複雜的運用,就像我們隊資料庫操作一樣,單表到多表,複合統計等等,列出幾個比喻的例子:

日誌log如下 user.log 112 王小二 113 李小明

trade.log 10:48:20.669 INFO [pool-9-thread-1] c.z.e.a.domain.job.TransSuccessJob [TransSuccessJob.java : 42] 積分交易成功通知 notifyVo={“uid”:112,“coin”:“yxt”,“amount”:10,“price”:13.5555,“oPrice”:13.5555,“transFee”:0.2711,“oTransFee”:0.2711,“tid”:4339,“buy”:true,“ext”:"",“dealTime”:“2018-09-04 10:03:10”}

1.比如會查詢單suer表特定幾個欄位的結果,單表[uid,coin,amount]統計結果,如結果要求如下 23 yxt 200 ETH 10 2.查詢[uaerName,coin,amout]的統計結果,這個就是多表關聯查詢,如結果要求如下 李三 yxt 200 ETH 10

看上去是不是很像資料庫的統計,但是這裡操作的不是資料庫,而且檔案,MapReduce就是要實現從分散式檔案中的超大資料中處理結果。而實現的關鍵就在於兩個實現類Mapper和Reducer

而這兩個實現類都需要使用的一個地方就是MapReduce<key,value>,從上面的處理過程講解我們知道,每一個步奏都是把資料處理成為了key,value鍵值對然後輸出給下一個處理過程,第二個處理過程接收到的依然是(key,vaule)鍵值對的資料型別,如何實現符合我們需要的鍵值型別才是整個資料處理的最重要的部分,畢竟我們不可能只處理一些基本型別的資料。

一些常見的基礎型別資料對<key,value> (Text,IntWritable)前面的wordCount就用到了,學生成績統計也可以使用 (LongWritable,LongWritable)比如手機號的流量統計,單位是KB

MapReduce: <key,value>

MapReduce常用資料型別

ByteWritable:單位元組數值 IntWritable:整型數 LongWritable:長整型數 FloatWritable:浮點數 DoubleWritable:雙位元組數值 BooleanWritable:標準布林型數值 Text:使用UTF8格式儲存的文字 NullWritable:當<key,value>中的key或value為空時使用

以上型別中 IntWritable LongWritable Text NullWritable DoubleWritable較為常用,實際中我們可能遇到複雜的型別,比如上面舉例的user.log特定欄位的統計,vule的值就不是一個基礎型別,而是好幾個欄位的複合型別,這種情況我們該如何處理呢。

MapReduce自定義資料型別

  • key實現WritableComparable
  • value通常只需要實現Writable

關鍵就在這裡, key決定了是不是同一個資料,比如是不是同一個人,同一個手機號,因此Comparable的功能必須有; value則決定了你需要處理的資料是什麼格式的

案例1:手機上網流量統計

資料來源: HTTP_20130313143750 1238 17823 3424 2342 每個電話號碼每上一次網路,就會生成一條日誌資料

需求目標: 手機號碼 上行資料包總數 下行資料包總數 上行總流量 下行總流量 13123412431 1238 17823 3424 2342 13622311238 17223 34214 23421 1231 13123412431 12380 1828 34 23

MapReduce設計: ** key是什麼? –電話號碼 ** value是什麼? –後面四個欄位

key                value list

13123412431 (123,2423,3453,234),(789,1231,4353,234),(1231,231,342,23) … 13622311238 (1233,23,342353,23234),(78239,123231,42353,2234) … … …

最終結果:

 hadoop dfs -cat /mnt/out/*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

13123412431 1276    17846   3448    2384
13123412432 1278    17848   3450    2386
13123412433 1280    17850   3452    2388
13123412434 1282    17852   3454    2390
13123412435 1284    17854   3456    2392
13123412436 1286    17856   3458    2394
13123412437 1288    17858   3460    2396
13123412438 1290    17860   3462    2398
13123412439 1292    17862   3464    2400
13123412440 1294    17864   3466    2402

構造複合型別的資料型別

package entity;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DataTotalWritable implements Writable {

    // 上行資料包總數
    private long upPackNum ;
    // 下行資料包總數
    private long downPackNum ;
    // 上行總流量
    private long upPayLoad ;
    // 下行總流量
    private long downPayLoad ;

    public DataTotalWritable() {
    }

    public DataTotalWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) {
        this.set(upPackNum, downPackNum, upPayLoad, downPayLoad);
    }

    public void set (long upPackNum, long downPackNum, long upPayLoad,long downPayLoad) {
        this.upPackNum = upPackNum;
        this.downPackNum = downPackNum;
        this.upPayLoad = upPayLoad;
        this.downPayLoad = downPayLoad;
    }

    public long getUpPackNum() {
        return upPackNum;
    }

    public void setUpPackNum(long upPackNum) {
        this.upPackNum = upPackNum;
    }

    public long getDownPackNum() {
        return downPackNum;
    }

    public void setDownPackNum(long downPackNum) {
        this.downPackNum = downPackNum;
    }

    public long getUpPayLoad() {
        return upPayLoad;
    }

    public void setUpPayLoad(long upPayLoad) {
        this.upPayLoad = upPayLoad;
    }

    public long getDownPayLoad() {
        return downPayLoad;
    }

    public void setDownPayLoad(long downPayLoad) {
        this.downPayLoad = downPayLoad;
    }

    //^為異或運算, << 帶符號左移, >>帶符號右移, >>> 無符號右移
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + (int) (downPackNum ^ (downPackNum >>> 32));
        result = prime * result + (int) (downPayLoad ^ (downPayLoad >>> 32));
        result = prime * result + (int) (upPackNum ^ (upPackNum >>> 32));
        result = prime * result + (int) (upPayLoad ^ (upPayLoad >>> 32));
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        DataTotalWritable other = (DataTotalWritable) obj;
        if (downPackNum != other.downPackNum)
            return false;
        if (downPayLoad != other.downPayLoad)
            return false;
        if (upPackNum != other.upPackNum)
            return false;
        if (upPayLoad != other.upPayLoad)
            return false;
        return true;
    }

    @Override
    public String toString() {
        return upPackNum + "\t"    + downPackNum + "\t"
                + upPayLoad + "\t"    + downPayLoad ;
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(upPackNum);
        out.writeLong(downPackNum);
        out.writeLong(upPayLoad);
        out.writeLong(downPayLoad);
    }

    public void readFields(DataInput in) throws IOException {
        this.upPackNum = in.readLong() ;
        this.downPackNum = in.readLong() ;
        this.upPayLoad = in.readLong() ;
        this.downPayLoad = in.readLong() ;
    }

}

map實現類和Reduce實現類

import java.io.IOException;

import entity.DataTotalWritable;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


/**
 * MapReduce複核型別資料處理
 *
  * <p>原始碼出自https://blog.csdn.net/helpless_pain/article/details/7015383</p>
 * 拷貝人 bamoboo
 * 2018-9-14
 */

public class Hadoop3  extends Configured implements Tool {

    //輸入資源對映處理
    public static class DataTotalMapper extends
            Mapper<LongWritable, Text, Text, DataTotalWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            //split by '\t'
            String[] splits = value.toString().split("\t") ;

            //以手機號碼作為output key
            String phoneNum = splits[0];
            Text mapOutputKey = new Text();
            mapOutputKey.set(phoneNum);

            // set map output value
            long upPackNum = Long.parseLong(splits[1]) ;
            long downPackNum = Long.parseLong(splits[2]) ;
            long upPayLoad = Long.parseLong(splits[3]) ;
            long downPayLoad = Long.parseLong(splits[4]) ;
            DataTotalWritable mapOutputValue = new DataTotalWritable() ;
            mapOutputValue.set(upPackNum, downPackNum, upPayLoad, downPayLoad);

            //map output
            context.write(mapOutputKey, mapOutputValue);
        }
    }

    public static class DataTotalReducer extends
            Reducer<Text, DataTotalWritable, Text, DataTotalWritable> {

        @Override
        protected void reduce(Text key, Iterable<DataTotalWritable> values,
                              Context context) throws IOException, InterruptedException {

            long upPackNumSum = 0;
            long downPackNumSum = 0;
            long upPayLoadSum = 0;
            long downPayLoadSum = 0;

            //iterator
            for(DataTotalWritable value : values){
                upPackNumSum += value.getUpPackNum() ;
                downPackNumSum += value.getDownPackNum() ;
                upPayLoadSum += value.getUpPayLoad() ;
                downPayLoadSum += value.getDownPayLoad()  ;
            }

            // set output value
            DataTotalWritable outputValue = new DataTotalWritable() ;
            outputValue.set(upPackNumSum, downPackNumSum, upPayLoadSum, downPayLoadSum);

            // output
            context.write(key, outputValue);
        }
    }

    public int run(String[] args) throws Exception {

        //Job
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf);
        job.setJarByClass(getClass());

        //Mapper
        job.setMapperClass(DataTotalMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DataTotalWritable.class);

        //Reducer
        job.setReducerClass(DataTotalReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DataTotalWritable.class);

        //輸入路徑
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);
        //輸出路徑
        Path outPath = new Path(args[1]);
        FileSystem dfs = FileSystem.get(conf);
        if (dfs.exists(outPath)) {
            dfs.delete(outPath, true);
        }
        FileOutputFormat.setOutputPath(job, outPath);

        //Submit Job
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0 : 1;
    }


    //這裡的執行輸入和輸出已經在程式中寫死了不需要在執行時指定引數
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();


        //執行時設定輸入 輸出
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        //如果args的引數長度小於2直接退出程式
        if (otherArgs.length < 2) {
            System.err.println("Usage: EventCount <in> <out>");
            System.exit(2);
        }

        //也可以直接寫死
        //args = new String[] {"hdfs://blue01.mydomain:8020/input2", "hdfs://blue01.mydomain:8020/output2"};

        // run job
        int status = ToolRunner.run(conf,new Hadoop3(),args);

        System.exit(status);

       /* Long p=13123412431L;
        for (int i=0;i<10;i++){
            System.out.printf("%d\t%d\t%d\t%d\t%d\n",p++ ,38+i, 23+i,  24+i, 42+i);
        }*/
    }
}

執行資料

把程式中/resources下的phone.log.1,phone.log.1上傳到/mnt/hadoop/input/phone/資料夾下

把打包後的程式hadoop-1-1.0-SNAPSHOT.jar上傳到/mnt/hadoop路徑下

清空資料來源和結果集輸出源

hdfs dfs -rm  /mnt/input/*
hadoop fs -rmr  /mnt/out

資料提交到HDFS

hdfs dfs -put /mnt/hadoop/input/phone/*   /mnt/input
hdfs dfs -ls /mnt/input

執行程式並檢視結果集

hadoop jar hadoop-1-1.0-SNAPSHOT.jar Hadoop3 /mnt/input /mnt/out
hadoop dfs -cat /mnt/out/*

結果就和上面中的結果集一模一樣

多型別資料行轉列

資料檔案: customer檔案 使用者CID,使用者姓名,聯絡方式 1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose Madriz,281-330-8004 4,David Stork,408-555-0000

order檔案 使用者CID,訂單號,價格,日期 3,A,12.95,02-Jun-2008 1,B,88.25,20-May-2008 2,C,32.00,30-Nov-2007 3,D,25.02,22-Jan-2009

目標: 使用者CID,訂單號,價格,時間,使用者姓名,聯絡方式 1,B,88.25,20-May-2008,Stephanie Leung,555-555-5555 2,C,32.00,30-Nov-2007,Edward Kim,123-456-7890 3,D,25.02,22-Jan-2009,Jose Madriz,281-330-8004 3,A,12.95,02-Jun-2008,Jose Madriz,281-330-8004

思路: 選用: Join

map階段: ** map task依次讀取兩個檔案,切割,並設定key和value,取cid為key,同時給來自不同的檔案的value打一個標籤 value == flag + value

key                value list
3        customer('Jose Madriz,281-330-8004')
3       order('A,12.95,02-Jun-2008')
3       order('D,25.02,22-Jan-2009')

Shuffle階段:

key                value list
3        customer('Jose Madriz,281-330-8004'),order('A,12.95,02-Jun-2008'),order('D,25.02,22-Jan-2009')

reduce階段: ** Join 資料根據flag得不同擺放好次序合併 3 A,12.95,02-Jun-2008,Jose Madriz,281-330-8004 3 D,25.02,22-Jan-2009,Jose Madriz,281-330-8004

資料型別DataJoinWritable

 private String flag;
 private String data;
  public void set(String flag, String data) {
        this.flag = flag;
        this.data = data;
    }

資料拆分合並DataJoinMapper 1.按空格或者tab切割 2.splits.length長度不同則設定不同的flag key,value存放格式為cid,(str1+str2)+str3的方式字串拼接

     public static class DataJoinMapper extends
            Mapper<LongWritable, Text, Text, DataJoinWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] splits = value.toString().split(",");

            //output key,以連線值為key
            String cid = splits[0];
            Text mapOutputKey = new Text();
            mapOutputKey.set(cid);

            //output value
            DataJoinWritable mapOutputValue = new DataJoinWritable();

            // length == 3 ==> customer
            if (splits.length == 3) {
                String name = splits[1];
                String phoneNum = splits[2];
                mapOutputValue.set("customer", name + "," + phoneNum);
            }

            // length == 4 ==> order
            if (splits.length == 4) {
                String name = splits[1];
                String price = splits[2];
                String date = splits[3];
                mapOutputValue.set("order", name + "," + price + "," + date);
            }
            
            context.write(mapOutputKey, mapOutputValue);
        }
    }

reduce資料在進行遍歷時,由於cid相同的資料會成為一個list,根據不同的flag進行合併即可,這裡customerInfo字串 和所有OderList中的資料遍歷合併,每合併一次就寫出一行資料。

 public static class DataJoinReducer extends
            Reducer<Text, DataJoinWritable, NullWritable, Text> {

        @Override
        protected void reduce(Text key, Iterable<DataJoinWritable> values, Context context) 
                throws IOException, InterruptedException {

            String customerInfo = null;
            List<String> orderList = new ArrayList<String>();

            for (DataJoinWritable value : values) {
                if ("customer".equals(value.getFlag())) {
                    customerInfo = value.getData();
                } else if ("order".equals(value.getFlag())) {
                    orderList.add(value.getData());
                }
            }

            //遍歷訂單列表和客戶資訊合併為一行資料並寫出
            Text outputValue = new Text();
            for (String order : orderList) {
                outputValue.set(key.toString() + "," + order + ","    + customerInfo);
                context.write(NullWritable.get(), outputValue);
            }
        }
    }

參考資料

如上程式碼已經上傳到github,請參考我的git倉庫地址