(九)MapReduce自定義檔案讀取和輸出元件
阿新 • • 發佈:2018-12-18
作用
- 自定義檔案讀取
讀取檔案時,預設是使用讀取器 LineRecoredReader<行首偏移量,每行內容>,每讀取一次,把key和value傳給 開發者開發的Mapper元件。現在自定義檔案讀取器,可以自定義讀取檔案的方法,這樣就可以調整傳遞給Mapper元件的key和value。
- 自定義檔案輸出
當結果需要輸出到檔案時,預設使用 FileOutputFormat的子類TextOutputFormat。該類的作用是,k v的分隔符是Tab製表符,行之間的分隔符是 換行符。自定義可以輸出檔案時,進行輸出結果的調整。
需求
有以下資料,需要傳到Mapper的key和value滿足以下要求:
- key是每行行號
- value是每行的內容
輸出結果到檔案時,使用自定義的格式輸出
hello world
hello dong
hello hadoop
hello world
程式碼實現
package hadoop02; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; /** * 此元件為格式讀取器,決定如何讀取檔案資料 * hadoop的預設格式讀取器:LineRecoredReader 預設:<行首偏移量,每行內容> * initialize是元件的初始化方法,只會呼叫一次。作用:初始化切片物件,檔案系統物件,行讀取器物件 * nextKeyValue會被呼叫多次,直到返回false才會停止呼叫。作用是:通過LineReader處理檔案內容,並初始化輸入key和value * getCurrentKey和getCurrentValue方法作用:將key和value傳給Mapper元件。 即nextKeyValue呼叫一次,這兩個方法也會被呼叫一次 * close方法最後做一些資源清理工作 * @author Administrator * */ public class AuthRecordReader extends RecordReader<IntWritable, Text>{ private FileSplit fs; private LineReader reader; private IntWritable key; private Text value; //記錄行號 private int count; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //初始化切片 fs = (FileSplit)split; Path path = fs.getPath(); //獲取環境物件 Configuration conf = context.getConfiguration(); //獲取檔案系統物件 FileSystem system = path.getFileSystem(conf); //獲取處理檔案對應的輸入流 InputStream in = system.open(path); //初始化行讀取器,可以一行一行處理資料 reader = new LineReader(in); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { key = new IntWritable(); value = new Text(); Text tmp = new Text(); //呼叫一次,讀取一行資料,傳給tmp //返回值表示,讀取該行的長度 int length = reader.readLine(tmp); if(length == 0) { //當沒有資料可讀時,則終止 nextKeyValue 方法的呼叫 return false; }else { count++; key.set(count); //行號 value.set(tmp); //每行內容 return true; } } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { if(reader != null) { reader = null; } } }
package hadoop02; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** * 自定義格式輸入元件,用於定於Mapper的輸入key和value型別 * 需要整合FileInputFormat抽象類 * @author Administrator * */ public class AuthInputFomat extends FileInputFormat<IntWritable, Text>{ /** * RecordReader物件決定了如何處理檔案,以及將輸入key和value傳給Mapper元件 */ @Override public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //返回一個自定義格式的RecordReader return new AuthRecordReader(); } }
package hadoop02;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class AuthRecordWriter<K,V> extends RecordWriter<K, V>{
FSDataOutputStream out;
public AuthRecordWriter(FSDataOutputStream out) {
//輸出流初始化
this.out = out;
}
@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
if(out != null) {
out.close();
}
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
out.write(key.toString().getBytes());
//key和value的間隔符
out.write("$$".getBytes());
out.write(value.toString().getBytes());
//每行的間隔符
out.write("\r\n".getBytes());
}
}
package hadoop02;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 自定義格式輸出元件,用於更改輸出結果檔案的格式
* 開發時用泛型,不要將key和value的型別寫死
*/
public class AuthOutputFormat<K,V> extends FileOutputFormat<K, V>{
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//通過父類提供的方法,獲取檔案的輸出路徑
Path path = super.getDefaultWorkFile(job, "");
Configuration conf = job.getConfiguration();
FileSystem system = path.getFileSystem(conf);
//獲取輸出結果檔案的輸出流,不能直接用OutputStream接。
//RecordWriter通過此輸出流輸出結果
FSDataOutputStream out = system.create(path);
return new AuthRecordWriter<K,V>(out);
}
}
package hadoop02;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperDemo extends Mapper<IntWritable, Text, IntWritable, Text>{
@Override
protected void map(IntWritable key, Text value, Mapper<IntWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
package hadoop02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DriverDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(DriverDemo.class);
job.setMapperClass(MapperDemo.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
//預設是FileInputFomat,底層會呼叫LineRecordReader。(key=行首偏移量,value=每行內容)
job.setInputFormatClass(AuthInputFomat.class);
//設定自定義的輸出格式元件,預設是FileOutputFormat的子類:TextOutputFormat
//TextOutputFormat的作用:k v的分隔符是Tab製表符,行之間的分隔符是 換行符
job.setOutputFormatClass(AuthOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.101.100:9000/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.101.100:9000/result"));
job.waitForCompletion(true);
}
}