OutputFormat概述

OutputFormat主要是用來指定MR程式的最終的輸出資料格式 。

預設使用的是TextOutputFormat,預設是將資料一行寫一條資料,並且把資料放到指定的輸出目錄下,以 part-r-xxxxx數字開頭。並且預設情況下有幾個ReduceTask就有幾個結果檔案產生

自定義OutputFormat

自定義OutputFormat的詳細流程:

  1. 定義MyOutputFormat繼承FileOutputFormat<T>,泛型傳入的是Reducer的輸出型別
  2. 重寫裡面的getRecordWriter()方法,這個方法需要返回一個RecordWriter物件。

    這個方法裡面定義了最終檔案輸出到什麼地方

  3. 建立一個RecordWriter物件,繼承RecordWriter<T>,重寫裡面的兩個方法:write()、close()。其中write()方法中需要定義想要將檔案輸出到什麼地方去,在這個方法中定義輸出資料地址和輸出資料格式
  4. 在Driver中通過job.setOutputFormatClass()指定我們使用的是哪個OutputFormat實現類

注意】如果設定了分割槽,並且指定了ReduceTask的數量,那麼根據以前所學的有多少個ReduceTask就會生成多少個結果檔案,是因為預設使用的是TextOutputFormat實現類,這個實現類就是幾個ReduceTask就有幾個結果檔案。但是如果我們自定義了OutputFormat,那麼結果檔案只有我們指明的地址,沒有其他。

案例實操

案例一:儲存資料到MySQL中

需求:將手機流量資料根據總流向升序輸出到MySQL資料庫中

程式碼:

  1. FlowOutputInformat.java

    public class FlowOutputFormat extends FileOutputFormat<FlowBean, NullWritable> {
    @Override
    public RecordWriter<FlowBean, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    return new MyRecordWriter();
    }
    }
  2. MyRecordWriter.java

    public class MyRecordWriter extends RecordWriter<FlowBean, NullWritable> {
    /**
    * 需要在這個方法中定義輸出格式、輸出資料地址
    * @param flowBean:Reduce階段輸出資料Key值
    * @param nullWritable:Reduce階段輸出value值
    */
    @SneakyThrows
    @Override
    public void write(FlowBean flowBean, NullWritable nullWritable) throws IOException, InterruptedException {
    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/sx_bigdata?serverTimezone=UTC", "root", "root");
    PreparedStatement preparedStatement = connection.prepareStatement("insert into phone_flow values (?, ?, ?, ?)");
    preparedStatement.setString(1, flowBean.getPhone());
    preparedStatement.setInt(2, flowBean.getUpFlow());
    preparedStatement.setInt(3, flowBean.getDownFlow());
    preparedStatement.setInt(4, flowBean.getSumFlow());
    int i = preparedStatement.executeUpdate();
    if (i > 0) {
    System.out.println("新增成功!");
    } else {
    System.out.println("新增失敗!");
    }
    connection.close();
    preparedStatement.close();
    } @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { }
  3. FlowDriver.java

    job.setOutputFormatClass(FlowOutputFormat.class);

案例二:儲存資料到HDFS本地指定資料夾中

需求:將單詞計數案例結果輸出到本地,其中首字母為大寫字母儲存在/upper.txt目錄下,首字母為小寫字母儲存在/lower.txt目錄下

程式碼:

  1. MyOutputFormat.java

    public class MyOutputFormat extends FileOutputFormat<Text, LongWritable> {
    @SneakyThrows
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    return new MyRecordWriter(taskAttemptContext);
    }
    }
  2. MyRecordWriter.java

    public class MyRecordWriter extends RecordWriter<Text, LongWritable> {
    FSDataOutputStream fsDataOutputStream1;
    FSDataOutputStream fsDataOutputStream2;
    public MyRecordWriter(TaskAttemptContext taskAttemptContext) throws Exception {
    Configuration configuration = taskAttemptContext.getConfiguration();
    FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), configuration, "root");
    Path out1 = new Path("/test/school/upper.txt");
    Path out2 = new Path("/test/school/lower.txt");
    if (fs.exists(out1)) {
    fs.delete(out1, true);
    }
    if (fs.exists(out2)) {
    fs.delete(out2, true);
    }
    fsDataOutputStream1 = fs.create(out1);
    fsDataOutputStream2 = fs.create(out2);
    } @Override
    public void write(Text text, LongWritable longWritable) throws IOException, InterruptedException {
    char firstWord = text.toString().charAt(0);
    String line = text + "\t" + longWritable.get() + "\r\n";
    if (Character.isUpperCase(firstWord)) {
    fsDataOutputStream1.write(line.getBytes());
    } else {
    fsDataOutputStream2.write(line.getBytes());
    }
    } @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    if (fsDataOutputStream1 != null) {
    fsDataOutputStream1.close();
    }
    if (fsDataOutputStream2 != null) {
    fsDataOutputStream2.close();
    }
    }
    }
  3. FlowDriver.java

    job.setOutputFormatClass(MyOutputFormat.class);