HDPCD-Java-複習筆記(13)- lab
Java lab booklet
package compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import parquet.hadoop.codec.SnappyCodec;
public class CompressDemoJob extends Configured implements Tool {
public static class CompressMapper extends Mapper<LongWritable, Text, Text, Text> {
private String searchString;
private Text outputKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] words = value.toString().split(" ");
for(String word: words) {
if(word.contains(searchString)) {
outputKey.set(words[0] + " " + words[2]);
context.write(outputKey, value);
}
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
searchString = context.getConfiguration().get("searchString");
}
}
public static class CompressReducer extends Reducer<Text, Text, NullWritable, Text> {
private NullWritable outputKey = NullWritable.get();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
while(values.iterator().hasNext()) {
context.write(outputKey, values.iterator().next());
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "CompressJob");
Configuration conf = job.getConfiguration();
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
conf.setClass(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class);
conf.setBoolean(FileOutputFormat.COMPRESS, true);
conf.setClass(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class);
conf.set("searchString", args[0]);
job.setJarByClass(CompressDemoJob.class);
Path out = new Path("logresults");
out.getFileSystem(conf).delete(out,true);
FileInputFormat.setInputPaths(job, new Path("logfiles"));
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(CompressMapper.class);
job.setReducerClass(CompressReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) {
int result = 0;
try {
result = ToolRunner.run(new Configuration(), new CompressDemoJob(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(result);
}
}
[email protected]:~/java/workspace/Compression# hdfs dfs -ls -h logresults/part-r-00000-rw-r--r-- 3 root hdfs 69.6 K 2017-10-15 22:53 logresults/part-r-00000
使用壓縮後:
Map-Reduce Framework
Map input records=981
Map output records=566
Map output bytes=80551
Map output materialized bytes=14985
Input split bytes=268
Combine input records=0
Combine output records=0
Reduce input groups=1
Reduce shuffle bytes=14985
Reduce input records=566
Reduce output records=566
Spilled Records=1132
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=597
CPU time spent (ms)=1510
Physical memory (bytes) snapshot=880906240
Virtual memory (bytes) snapshot=3296817152
Total committed heap usage (bytes)=845029376
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=104951
File Output Format Counters
Bytes Written=13776
[email protected]:~/java/workspace/Compression# hdfs dfs -ls -h logresults/
Found 2 items
-rw-r--r-- 3 root hdfs 0 2017-10-15 23:26 logresults/_SUCCESS
-rw-r--r-- 3 root hdfs 13.5 K 2017-10-15 23:26 logresults/part-r-00000.snappy
The .snappy file is in a binary format, so to view itscontents you need to use the hadoop fs -text command:
[email protected]:~/java/workspace/Compression#hdfs dfs -text logresults/part-r-00000.snappy
package customsort;
import org.apache.hadoop.io.WritableComparator;
public class StockComparator extends WritableComparator {
public StockComparator() {
super(Stock.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
/**
* To compare the two strings, you need to compute their length first,
* which is conveniently stored as part of a UTF string as a short prefixed to the string.
*/
int strLength1 = readUnsignedShort(b1,s1);
int strLength2 = readUnsignedShort(b2,s2);
/**
* the “+ 2” is an offset because of the short at the beginning of each string
*/
int response = compareBytes(b1, s1 + 2, strLength1,
b2, s2 + 2, strLength2);
if(response != 0) return response;
else {
/**
* We know the length of every date is 10 (the dates look like yyyy -mm- dd )
*/
response = compareBytes(b1, s1 + 2 + strLength1 + 2, 10,
b2, s2 + 2 + strLength2 + 2, 10);
}
return response;
}
}
package customsort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class Stock implements WritableComparable<Stock> {
private String symbol;
private String date;
static{
/**
* A RawComparator needs to be registered using the static define method of WritableComparator,
* and this only needs to occur once within the application.
*/
WritableComparator.define(Stock.class, new StockComparator());
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(symbol);
out.writeUTF(date);
}
@Override
public void readFields(DataInput in) throws IOException {
symbol = in.readUTF();
date = in.readUTF();
}
@Override
public int compareTo(Stock stock) {
int response = this.symbol.compareTo(stock.symbol);
if (response != 0) {
return response;
}else {
response = this.date.compareTo(stock.date);
return response;
}
}
}