1. 程式人生 > >HDPCD-Java-複習筆記(13)- lab

HDPCD-Java-複習筆記(13)- lab

Java lab booklet

package compress;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import parquet.hadoop.codec.SnappyCodec;


public class CompressDemoJob extends Configured implements Tool {

	public static class CompressMapper extends Mapper<LongWritable, Text, Text, Text> {
		private String searchString;
		private Text outputKey = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String [] words = value.toString().split(" ");
			for(String word: words) {
				if(word.contains(searchString)) {
					outputKey.set(words[0] + " " + words[2]);
					context.write(outputKey, value);
				}
			}
		}

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			searchString = context.getConfiguration().get("searchString");
		}
	}


	public static class CompressReducer extends Reducer<Text, Text, NullWritable, Text> {
		private NullWritable outputKey = NullWritable.get();
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			while(values.iterator().hasNext()) {
				context.write(outputKey, values.iterator().next());
			}
		}

	}
	

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), "CompressJob");
		Configuration conf = job.getConfiguration();
		
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); conf.setClass(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class); conf.setBoolean(FileOutputFormat.COMPRESS, true); conf.setClass(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class); conf.set("searchString", args[0]); job.setJarByClass(CompressDemoJob.class); Path out = new Path("logresults"); out.getFileSystem(conf).delete(out,true); FileInputFormat.setInputPaths(job, new Path("logfiles")); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(CompressMapper.class); job.setReducerClass(CompressReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) { int result = 0; try { result = ToolRunner.run(new Configuration(), new CompressDemoJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(result); } }
[email protected]:~/java/workspace/Compression# hdfs dfs -ls -h logresults/part-r-00000
-rw-r--r--   3 root hdfs     69.6 K 2017-10-15 22:53 logresults/part-r-00000

使用壓縮後:

Map-Reduce Framework
Map input records=981
Map output records=566
Map output bytes=80551
Map output materialized bytes=14985
Input split bytes=268
Combine input records=0
Combine output records=0
Reduce input groups=1
Reduce shuffle bytes=14985


Reduce input records=566
Reduce output records=566
Spilled Records=1132
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=597
CPU time spent (ms)=1510
Physical memory (bytes) snapshot=880906240
Virtual memory (bytes) snapshot=3296817152
Total committed heap usage (bytes)=845029376
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=104951
File Output Format Counters 
Bytes Written=13776

[email protected]:~/java/workspace/Compression# hdfs dfs -ls -h logresults/
Found 2 items
-rw-r--r--   3 root hdfs          0 2017-10-15 23:26 logresults/_SUCCESS
-rw-r--r--   3 root hdfs     13.5 K 2017-10-15 23:26 logresults/part-r-00000.snappy

The .snappy file is in a binary format, so to view itscontents you need to use the hadoop fs -text command:

[email protected]:~/java/workspace/Compression#hdfs dfs -text logresults/part-r-00000.snappy

package customsort;

import org.apache.hadoop.io.WritableComparator;

public class StockComparator extends WritableComparator {
	
	public StockComparator() {
		super(Stock.class);
	}
	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		/**
		 * To compare the two strings, you need to compute their length first, 
		 * which is conveniently stored as part of a UTF string as a short prefixed to the string.
		 */
		int strLength1 = readUnsignedShort(b1,s1);
		int strLength2 = readUnsignedShort(b2,s2);
		/**
		 * the “+ 2” is an offset because of the short at the beginning of each string
		 */
		int response = compareBytes(b1, s1 + 2, strLength1,
					 b2, s2 + 2, strLength2);
		if(response != 0) return response;
		else {
			/**
			 * We know the length of every date is 10 (the dates look like yyyy -mm- dd )
			 */
			response = compareBytes(b1, s1 + 2 + strLength1 + 2, 10,
					b2, s2 + 2 + strLength2 + 2, 10);

		}

		return response;
	}
}
package customsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class Stock implements WritableComparable<Stock> {
	private String symbol;
	private String date;
	static{
		/**
		 * A RawComparator needs to be registered using the static define method of WritableComparator, 
		 * and this only needs to occur once within the application.
		 */
		WritableComparator.define(Stock.class, new StockComparator());
	}

	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

	public String getDate() {
		return date;
	}

	public void setDate(String date) {
		this.date = date;
	}

	@Override
	public void write(DataOutput out) throws IOException {
        out.writeUTF(symbol);
        out.writeUTF(date);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
        symbol = in.readUTF();
        date = in.readUTF();
	}

	@Override
	public int compareTo(Stock stock) {
		int response = this.symbol.compareTo(stock.symbol);
		if (response != 0) {
			return response;
		}else {
			response = this.date.compareTo(stock.date);
			return response;
		}
	}

}