HDPCD-Java-複習筆記(22)- lab
阿新 • • 發佈:2018-12-26
Java lab booklet
package bloom; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; public class StockDividendFilter extends Configured implements Tool { private static final String FILTER_FILE = "bloom/dividendfilter"; public static class BloomMapper extends Mapper<LongWritable, Text, IntWritable, BloomFilter> { private final IntWritable ONE = new IntWritable(1); Stock stock = new Stock(); private final String COMMA = ","; private BloomFilter outputValue; private String stockSymbol; @Override protected void setup(Context context) throws IOException, InterruptedException { stockSymbol = context.getConfiguration().get("stockSymbol"); outputValue = new BloomFilter(10000, 2, Hash.MURMUR_HASH); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] words = value.toString().split(COMMA); String currentSymbol = words[1]; if(stockSymbol.equals(currentSymbol)) { stock.setSymbol(currentSymbol); stock.setDate(words[2]); outputValue.add(new Key(stock.toString().getBytes())); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(ONE, outputValue); } } public static class BloomReducer extends Reducer<IntWritable, BloomFilter, NullWritable, NullWritable> { private BloomFilter allValues; @Override protected void setup(Context context) throws IOException, InterruptedException { allValues = new BloomFilter(10000, 2, Hash.MURMUR_HASH); } @Override protected void reduce(IntWritable key, Iterable<BloomFilter> values, Context context) throws IOException, InterruptedException { while(values.iterator().hasNext()) { BloomFilter current = values.iterator().next(); allValues.or(current); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path path = new Path(FILTER_FILE); FSDataOutputStream out = path.getFileSystem(conf).create(path); allValues.write(out); out.close(); } } public static class StockFilterMapper extends Mapper<LongWritable, Text, Stock, DoubleWritable> { private BloomFilter dividends; private Stock outputKey = new Stock(); private DoubleWritable outputValue = new DoubleWritable(); private String stockSymbol; private final String COMMA = ","; @Override protected void setup(Context context) throws IOException, InterruptedException { stockSymbol = context.getConfiguration().get("stockSymbol"); Configuration conf = context.getConfiguration(); Path path = new Path(FILTER_FILE); FSDataInputStream in = path.getFileSystem(conf).open(path); dividends = new BloomFilter(10000,2,Hash.MURMUR_HASH); dividends.readFields(in); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] words = value.toString().split(COMMA); String currentSymbol = words[1]; if(currentSymbol.equals(stockSymbol)) { outputKey.setSymbol(currentSymbol); outputKey.setDate(words[2]); Key stockKey = new Key(outputKey.toString().getBytes()); if(dividends.membershipTest(stockKey)) { outputValue.set(Double.parseDouble(words[6])); context.write(outputKey, outputValue); } } } } public static class StockFilterReducer extends Reducer<Stock, DoubleWritable, Text, DoubleWritable> { private String stockSymbol = ""; private Text outputKey = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { stockSymbol = context.getConfiguration().get("stockSymbol"); } @Override protected void reduce(Stock key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { //Check for a false positive if(!stockSymbol.equals(key.getSymbol())) { System.out.println("False positive: " + key.getSymbol()); } else { while(values.iterator().hasNext()) { DoubleWritable closingPrice = values.iterator().next(); outputKey.set(key.toString()); context.write(outputKey, closingPrice); } } } } @Override public int run(String[] args) throws Exception { Job job1 = Job.getInstance(getConf(), "CreateBloomFilter"); job1.setJarByClass(getClass()); Configuration conf = job1.getConfiguration(); conf.set("stockSymbol", args[0]); FileInputFormat.setInputPaths(job1, new Path("dividends")); job1.setMapperClass(BloomMapper.class); job1.setReducerClass(BloomReducer.class); job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(NullOutputFormat.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(BloomFilter.class); job1.setOutputKeyClass(NullWritable.class); job1.setOutputValueClass(NullWritable.class); job1.setNumReduceTasks(1); boolean job1success = job1.waitForCompletion(true); if(!job1success) { System.out.println("The CreateBloomFilter job failed!"); return -1; } Job job2 = Job.getInstance(conf, "FilterStocksJob"); job2.setJarByClass(getClass()); conf = job2.getConfiguration(); Path out = new Path("bloomoutput"); out.getFileSystem(conf).delete(out,true); FileInputFormat.setInputPaths(job2, new Path("stocks")); FileOutputFormat.setOutputPath(job2, out); job2.setMapperClass(StockFilterMapper.class); job2.setReducerClass(StockFilterReducer.class); job2.setInputFormatClass(TextInputFormat.class); job2.setOutputFormatClass(TextOutputFormat.class); job2.setMapOutputKeyClass(Stock.class); job2.setMapOutputValueClass(DoubleWritable.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(DoubleWritable.class); boolean job2success = job2.waitForCompletion(true); if(!job2success) { System.out.println("The FilterStocksJob failed!"); return -1; } return 1; } public static void main(String[] args) { int result = 0; try { result = ToolRunner.run(new Configuration(), new StockDividendFilter(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(result); } }
workflow.xmlpackage bloom; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Stock implements WritableComparable<Stock> { private String symbol; private String date; private static final String COMMA = ","; @Override public boolean equals(Object obj) { if(obj instanceof Stock) { Stock other = (Stock) obj; if(symbol.equals(other.symbol) && date.equals(other.date)) { return true; } } return false; } @Override public int hashCode() { return (symbol + date).hashCode(); } @Override public void readFields(DataInput in) throws IOException { symbol = in.readUTF(); date = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(symbol); out.writeUTF(date); } @Override public int compareTo(Stock arg0) { int response = this.symbol.compareTo(arg0.symbol); if(response == 0) { response = this.date.compareTo(arg0.date); } return response; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(symbol).append(COMMA).append(date); return sb.toString(); } }
job.properties<workflow-app xmlns="uri:oozie:workflow:0.2" name="dividendstockfilter-workflow"> <start to="build-bloomfilter" /> <action name="build-bloomfilter"> <map-reduce> <job-tracker>${resourceManager}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/bloom/temp" /> </prepare> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property> <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> <property> <name>mapreduce.job.map.class</name> <value>bloom.StockDividendFilter$BloomMapper</value> </property> <property> <name>mapreduce.job.reduce.class</name> <value>bloom.StockDividendFilter$BloomReducer</value> </property> <property> <name>mapreduce.job.inputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat </value> </property> <property> <name>mapreduce.job.outputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.output.NullOutputFormat </value> </property> <property> <name>mapreduce.map.output.key.class</name> <value>org.apache.hadoop.io.IntWritable</value> </property> <property> <name>mapreduce.map.output.value.class</name> <value>org.apache.hadoop.util.bloom.BloomFilter</value> </property> <property> <name>mapreduce.job.output.key.class</name> <value>org.apache.hadoop.io.NullWritable</value> </property> <property> <name>mapreduce.job.output.value.class</name> <value>org.apache.hadoop.io.NullWritable</value> </property> <property> <name>mapreduce.job.reduces</name> <value>1</value> </property> <property> <name>mapreduce.input.fileinputformat.inputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/dividends</value> </property> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/temp</value> </property> <property> <name>stockSymbol</name> <value>${stockSymbol}</value> </property> </configuration> </map-reduce> <ok to="filter-stocks" /> <error to="fail" /> </action> <action name="filter-stocks"> <map-reduce> <job-tracker>${resourceManager}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/bloom/bloomoutput" /> </prepare> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property> <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> <property> <name>mapreduce.job.map.class</name> <value>bloom.StockDividendFilter$StockFilterMapper</value> </property> <property> <name>mapreduce.job.reduce.class</name> <value>bloom.StockDividendFilter$StockFilterReducer</value> </property> <property> <name>mapreduce.job.inputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat </value> </property> <property> <name>mapreduce.job.outputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat </value> </property> <property> <name>mapreduce.job.output.key.class</name> <value>bloom.Stock</value> </property> <property> <name>mapreduce.job.output.value.class</name> <value>org.apache.hadoop.io.DoubleWritable</value> </property> <property> <name>mapreduce.job.reduces</name> <value>1</value> </property> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/bloomoutput</value> </property> <property> <name>mapreduce.input.fileinputformat.inputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/stocks</value> </property> <property> <name>stockSymbol</name> <value>${stockSymbol}</value> </property> </configuration> </map-reduce> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
# hadoop fs -mkdir bloom
# hadoop fs -mkdirbloom/lib# hadoop fs -mkdir bloom/dividends
# hadoop fs -put ~/java/labs/data/stock_dividends/NYSE_dividends_A.csvbloom/dividends# hadoop fs -mkdir bloom/stocks
# hadoop fs -put~/java/labs/data/stock_prices/NYSE_daily_prices_A.csv bloom/stocks# oozie job -config job.properties -run