1. 程式人生 > >大數據學習之自定義輸出 13

大數據學習之自定義輸出 13

系統 java pub 什麽 rri args sda stream out

二:自定義輸出

自定義輸出

需求:過濾日誌文件

把包含itstaredu的放在一個文件中 d:/itstaredu.log

把不包含itstaredu的放在另外一個文件 d:/other.log

1:自定義編寫FileOutputFormate

package it.dawn.YARNPra.自定義.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Dawn
 * @date 2019年5月11日23:45:47
 * @version 1.0
 * 類似自定義輸入,根據源碼自己寫一個FileOutputFormat
 * 繼承FileOutputFormat
 */
public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable>{

	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {
		FileRecordWriter recordWriter = new FileRecordWriter(job);
		return recordWriter;
	}

}

  

2 : 自定義編寫FileRecordWriter

package it.dawn.YARNPra.自定義.outputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * @author Dawn
 * @date 2019年5月11日23:48:31
 * @version 1.0
 * 繼承 RecordWriter
 */
public class FileRecordWriter extends RecordWriter<Text, NullWritable>{
	
	Configuration conf=null;
	FSDataOutputStream itstarlog=null;
	FSDataOutputStream otherlog=null;

	//1.定義數據輸出路徑
	public FileRecordWriter(TaskAttemptContext job) throws IOException {
		//獲取配置信息
		conf=job.getConfiguration();
		
		//獲取文件系統
		FileSystem fs=FileSystem.get(conf);
		
		//定義輸出路徑
		//默認就是那個我們很熟悉的part-r-00000。這裏我們把它自定義成itstar.log  other.log
		itstarlog=fs.create(new Path("f:/temp/outputformateSelf/fileoutSelf1/itstar.log"));
		otherlog=fs.create(new Path("f:/temp/outputformateSelf/fileoutSelf2/other.log"));
	}
	
	//2.數據輸出
	@Override
	public void write(Text key, NullWritable value) throws IOException, InterruptedException {
		//判斷的話根據key
		if(key.toString().contains("itstar")) {
			//寫出到文件
			itstarlog.write(key.getBytes());
		}else {
			otherlog.write(key.getBytes());
		}
		
	}

	//3.關閉資源
	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		if(null != itstarlog) {
			itstarlog.close();
		}
		
		if(null != otherlog) {
			otherlog.close();
		}
		
	}

}

 

3:編寫MR

mapper

package it.dawn.YARNPra.自定義.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author Dawn
 * @date 2019年5月11日23:58:27
 * @version 1.0
 * 直接代碼一把梭,寫出去
 */
public class FileMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		context.write(value, NullWritable.get());
	}
	
	

}

  

Reduce:

package it.dawn.YARNPra.自定義.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FileReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

	@Override
	protected void reduce(Text key, Iterable<NullWritable> values,
				Context context) throws IOException, InterruptedException {
		//換個行吧!
		String k = key.toString()+"\n";
		
		context.write(new Text(k), NullWritable.get());
	}
	
	

}

  

Driver類:

package it.dawn.YARNPra.自定義.outputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Dawn
 * @date 2019年5月12日00:03:03
 * @version 1.0
 * 
 * 這裏大家可能有個小疑問?
 * 就是我們已近在自定義輸出的時候,已經指定了輸出位置。為什麽我們這裏還是要寫輸出位置?
 * 
 * 大家可以這樣想下,就是我們不進行自定義輸出的時候,是不是每次任務之後,
 * 會出現一大堆的文件 ._SUCCESS.crc  .part-r-00000.crc _SUCCESS  part-r-00000這4個的嘛。
 * 而我們再自己寫的自定義輸出的時候,其實只是對part-r-00000文件指定了位置,而其他的什麽 ._SUCCESS.crc ...這些沒做處理啊!!
 * 
 */
public class FileDriver {
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 1.獲取job信息
		Configuration conf = new Configuration();
		Job job=Job.getInstance(conf);
		
		// 2.獲取jar包
		job.setJarByClass(FileDriver.class);
		
		// 3.獲取自定義的mapper與reducer類
		job.setMapperClass(FileMapper.class);
		job.setReducerClass(FileReducer.class);
		
		// 4.設置map輸出的數據類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		// 5.設置reduce輸出的數據類型(最終的數據類型)
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		//設置自定outputFormat
		job.setOutputFormatClass(FuncFileOutputFormat.class);
		
		// 6.設置輸入存在的路徑與處理後的結果路徑
		FileInputFormat.setInputPaths(job, new Path("f:/temp/流量日誌.dat"));
		FileOutputFormat.setOutputPath(job, new Path("f:/temp/outputformateSelf"));
		
		// 7.提交任務
		boolean rs = job.waitForCompletion(true);
		System.out.println(rs? "成功":"失敗");
	}

}

  

運行截圖:

輸入:

技術分享圖片

輸出(看好了 路徑根據 FileRecordWriter類中的一樣 ):

技術分享圖片

===============================================================

技術分享圖片

=============================================================================================

技術分享圖片

大數據學習之自定義輸出 13