1. 程式人生 > >(九)MapReduce自定義檔案讀取和輸出元件

(九)MapReduce自定義檔案讀取和輸出元件

作用

  • 自定義檔案讀取

讀取檔案時,預設是使用讀取器 LineRecoredReader<行首偏移量,每行內容>,每讀取一次,把key和value傳給 開發者開發的Mapper元件。現在自定義檔案讀取器,可以自定義讀取檔案的方法,這樣就可以調整傳遞給Mapper元件的key和value。

  • 自定義檔案輸出

當結果需要輸出到檔案時,預設使用 FileOutputFormat的子類TextOutputFormat。該類的作用是,k v的分隔符是Tab製表符,行之間的分隔符是 換行符。自定義可以輸出檔案時,進行輸出結果的調整。

需求

有以下資料,需要傳到Mapper的key和value滿足以下要求:

  • key是每行行號
  • value是每行的內容

輸出結果到檔案時,使用自定義的格式輸出

hello world
hello dong
hello hadoop
hello world

程式碼實現

package hadoop02;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

/**
 * 此元件為格式讀取器,決定如何讀取檔案資料
 * hadoop的預設格式讀取器:LineRecoredReader 預設:<行首偏移量,每行內容>
 * initialize是元件的初始化方法,只會呼叫一次。作用:初始化切片物件,檔案系統物件,行讀取器物件
 * nextKeyValue會被呼叫多次,直到返回false才會停止呼叫。作用是:通過LineReader處理檔案內容,並初始化輸入key和value
 * getCurrentKey和getCurrentValue方法作用:將key和value傳給Mapper元件。 即nextKeyValue呼叫一次,這兩個方法也會被呼叫一次
 * close方法最後做一些資源清理工作
 * @author Administrator
 *
 */
public class AuthRecordReader extends RecordReader<IntWritable, Text>{
	
	private FileSplit fs;
	private LineReader reader;
	
	private IntWritable key;
	private Text value;
	//記錄行號
	private int count;

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		
		//初始化切片
		fs = (FileSplit)split;
		Path path = fs.getPath();
		//獲取環境物件
		Configuration conf = context.getConfiguration();
		//獲取檔案系統物件
		FileSystem system = path.getFileSystem(conf);
		//獲取處理檔案對應的輸入流
		InputStream in = system.open(path);
		//初始化行讀取器,可以一行一行處理資料	
		reader = new LineReader(in);
		
	}

	
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		key = new IntWritable();
		value = new Text();
		Text tmp = new Text();
		
		//呼叫一次,讀取一行資料,傳給tmp
		//返回值表示,讀取該行的長度
		int length = reader.readLine(tmp);
		
		if(length == 0) {
			//當沒有資料可讀時,則終止 nextKeyValue 方法的呼叫
			return false;
		}else {
			count++;
			key.set(count); //行號
			value.set(tmp); //每行內容
			
			return true;
		}
	}

	@Override
	public IntWritable getCurrentKey() throws IOException, InterruptedException {
		
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return 0;
	}

	@Override
	public void close() throws IOException {
		if(reader != null) {
			reader = null;
		}
		
	}

}
package hadoop02;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * 自定義格式輸入元件,用於定於Mapper的輸入key和value型別
 * 需要整合FileInputFormat抽象類
 * @author Administrator
 *
 */
public class AuthInputFomat extends FileInputFormat<IntWritable, Text>{

	/**
	 * RecordReader物件決定了如何處理檔案,以及將輸入key和value傳給Mapper元件
	 */
	@Override
	public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		
		//返回一個自定義格式的RecordReader
		return new AuthRecordReader();
	}
	
}	
package hadoop02;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class AuthRecordWriter<K,V> extends RecordWriter<K, V>{

	FSDataOutputStream out;
	
	public AuthRecordWriter(FSDataOutputStream out) {
		//輸出流初始化
		this.out = out;
	}

	@Override
	public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
		if(out != null) {
			out.close();
		}
	}

	@Override
	public void write(K key, V value) throws IOException, InterruptedException {
		
		out.write(key.toString().getBytes());
		//key和value的間隔符
		out.write("$$".getBytes());
		
		out.write(value.toString().getBytes());
		//每行的間隔符
		out.write("\r\n".getBytes());
	} 

}
package hadoop02;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 自定義格式輸出元件,用於更改輸出結果檔案的格式
 * 開發時用泛型,不要將key和value的型別寫死
 */
public class AuthOutputFormat<K,V> extends FileOutputFormat<K, V>{

	@Override
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		
		//通過父類提供的方法,獲取檔案的輸出路徑
		Path path = super.getDefaultWorkFile(job, "");
		
		Configuration conf = job.getConfiguration();
		FileSystem system = path.getFileSystem(conf);
		//獲取輸出結果檔案的輸出流,不能直接用OutputStream接。
		//RecordWriter通過此輸出流輸出結果
		FSDataOutputStream out = system.create(path);
		
		return new AuthRecordWriter<K,V>(out);
	}

}
package hadoop02;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperDemo extends Mapper<IntWritable, Text, IntWritable, Text>{
	
	@Override
	protected void map(IntWritable key, Text value, Mapper<IntWritable, Text, IntWritable, Text>.Context context)
			throws IOException, InterruptedException {
		
		context.write(key, value);
	}
}

 

package hadoop02;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class DriverDemo {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance();
		
		job.setJarByClass(DriverDemo.class);
		job.setMapperClass(MapperDemo.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		//預設是FileInputFomat,底層會呼叫LineRecordReader。(key=行首偏移量,value=每行內容)
		job.setInputFormatClass(AuthInputFomat.class);
		//設定自定義的輸出格式元件,預設是FileOutputFormat的子類:TextOutputFormat
		//TextOutputFormat的作用:k v的分隔符是Tab製表符,行之間的分隔符是 換行符
		job.setOutputFormatClass(AuthOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.101.100:9000/input"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.101.100:9000/result"));
		
		job.waitForCompletion(true);
	}
}