OutputFormat概述
OutputFormat主要是用來指定MR程式的最終的輸出資料格式 。
預設使用的是TextOutputFormat,預設是將資料一行寫一條資料,並且把資料放到指定的輸出目錄下,以 part-r-xxxxx數字開頭。並且預設情況下有幾個ReduceTask就有幾個結果檔案產生
自定義OutputFormat
自定義OutputFormat的詳細流程:
- 定義MyOutputFormat繼承FileOutputFormat<T>,泛型傳入的是Reducer的輸出型別
- 重寫裡面的getRecordWriter()方法,這個方法需要返回一個RecordWriter物件。
這個方法裡面定義了最終檔案輸出到什麼地方
- 建立一個RecordWriter物件,繼承RecordWriter<T>,重寫裡面的兩個方法:write()、close()。其中write()方法中需要定義想要將檔案輸出到什麼地方去,在這個方法中定義輸出資料地址和輸出資料格式
- 在Driver中通過job.setOutputFormatClass()指定我們使用的是哪個OutputFormat實現類
【注意】如果設定了分割槽,並且指定了ReduceTask的數量,那麼根據以前所學的有多少個ReduceTask就會生成多少個結果檔案,是因為預設使用的是TextOutputFormat實現類,這個實現類就是幾個ReduceTask就有幾個結果檔案。但是如果我們自定義了OutputFormat,那麼結果檔案只有我們指明的地址,沒有其他。
案例實操
案例一:儲存資料到MySQL中
需求:將手機流量資料根據總流向升序輸出到MySQL資料庫中
程式碼:
- FlowOutputInformat.java
public class FlowOutputFormat extends FileOutputFormat<FlowBean, NullWritable> {
@Override
public RecordWriter<FlowBean, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new MyRecordWriter();
}
} - MyRecordWriter.java
public class MyRecordWriter extends RecordWriter<FlowBean, NullWritable> {
/**
* 需要在這個方法中定義輸出格式、輸出資料地址
* @param flowBean:Reduce階段輸出資料Key值
* @param nullWritable:Reduce階段輸出value值
*/
@SneakyThrows
@Override
public void write(FlowBean flowBean, NullWritable nullWritable) throws IOException, InterruptedException {
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/sx_bigdata?serverTimezone=UTC", "root", "root");
PreparedStatement preparedStatement = connection.prepareStatement("insert into phone_flow values (?, ?, ?, ?)");
preparedStatement.setString(1, flowBean.getPhone());
preparedStatement.setInt(2, flowBean.getUpFlow());
preparedStatement.setInt(3, flowBean.getDownFlow());
preparedStatement.setInt(4, flowBean.getSumFlow());
int i = preparedStatement.executeUpdate();
if (i > 0) {
System.out.println("新增成功!");
} else {
System.out.println("新增失敗!");
}
connection.close();
preparedStatement.close();
} @Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { } - FlowDriver.java
job.setOutputFormatClass(FlowOutputFormat.class);
案例二:儲存資料到HDFS本地指定資料夾中
需求:將單詞計數案例結果輸出到本地,其中首字母為大寫字母儲存在/upper.txt目錄下,首字母為小寫字母儲存在/lower.txt目錄下
程式碼:
- MyOutputFormat.java
public class MyOutputFormat extends FileOutputFormat<Text, LongWritable> {
@SneakyThrows
@Override
public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new MyRecordWriter(taskAttemptContext);
}
} - MyRecordWriter.java
public class MyRecordWriter extends RecordWriter<Text, LongWritable> {
FSDataOutputStream fsDataOutputStream1;
FSDataOutputStream fsDataOutputStream2;
public MyRecordWriter(TaskAttemptContext taskAttemptContext) throws Exception {
Configuration configuration = taskAttemptContext.getConfiguration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), configuration, "root");
Path out1 = new Path("/test/school/upper.txt");
Path out2 = new Path("/test/school/lower.txt");
if (fs.exists(out1)) {
fs.delete(out1, true);
}
if (fs.exists(out2)) {
fs.delete(out2, true);
}
fsDataOutputStream1 = fs.create(out1);
fsDataOutputStream2 = fs.create(out2);
} @Override
public void write(Text text, LongWritable longWritable) throws IOException, InterruptedException {
char firstWord = text.toString().charAt(0);
String line = text + "\t" + longWritable.get() + "\r\n";
if (Character.isUpperCase(firstWord)) {
fsDataOutputStream1.write(line.getBytes());
} else {
fsDataOutputStream2.write(line.getBytes());
}
} @Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
if (fsDataOutputStream1 != null) {
fsDataOutputStream1.close();
}
if (fsDataOutputStream2 != null) {
fsDataOutputStream2.close();
}
}
} - FlowDriver.java
job.setOutputFormatClass(MyOutputFormat.class);