1. 程式人生 > >HDPCD-Java-複習筆記(22)- lab

HDPCD-Java-複習筆記(22)- lab

Java lab booklet

package bloom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class StockDividendFilter extends Configured implements Tool {
	private static final String FILTER_FILE = "bloom/dividendfilter";

	public static class BloomMapper extends Mapper<LongWritable, Text, IntWritable, BloomFilter> {
		private final IntWritable ONE = new IntWritable(1);
		Stock stock = new Stock();
		private final String COMMA = ",";
		private BloomFilter outputValue;
		private String stockSymbol;
		
		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			stockSymbol = context.getConfiguration().get("stockSymbol");	
			outputValue = new BloomFilter(10000, 2, Hash.MURMUR_HASH);
		}

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String [] words = value.toString().split(COMMA);
			String currentSymbol = words[1];
			if(stockSymbol.equals(currentSymbol)) {
				stock.setSymbol(currentSymbol);
				stock.setDate(words[2]);
				outputValue.add(new Key(stock.toString().getBytes()));
			}
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(ONE, outputValue);
		}	
	}
	
	public static class BloomReducer extends Reducer<IntWritable, BloomFilter, NullWritable, NullWritable> {
		private BloomFilter allValues;
		
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			allValues = new BloomFilter(10000, 2, Hash.MURMUR_HASH);
		}

		@Override
		protected void reduce(IntWritable key, Iterable<BloomFilter> values, Context context)
				throws IOException, InterruptedException {			
			while(values.iterator().hasNext()) {
				BloomFilter current = values.iterator().next();
				allValues.or(current);
			}
		}

		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			Configuration conf = context.getConfiguration();
		    Path path = new Path(FILTER_FILE);
		    FSDataOutputStream out = path.getFileSystem(conf).create(path);
		    allValues.write(out);
		    out.close();
		}
	}
	
	
	public static class StockFilterMapper extends Mapper<LongWritable, Text, Stock, DoubleWritable> {
		private BloomFilter dividends;
		private Stock outputKey = new Stock();
		private DoubleWritable outputValue = new DoubleWritable();
		private String stockSymbol;
		private final String COMMA = ",";

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			stockSymbol = context.getConfiguration().get("stockSymbol");

			Configuration conf = context.getConfiguration();
		    Path path = new Path(FILTER_FILE);
		    FSDataInputStream in = path.getFileSystem(conf).open(path);
			dividends = new BloomFilter(10000,2,Hash.MURMUR_HASH);
			dividends.readFields(in);
		}
		
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String [] words = value.toString().split(COMMA);
			String currentSymbol = words[1];
			if(currentSymbol.equals(stockSymbol)) {
				outputKey.setSymbol(currentSymbol);
				outputKey.setDate(words[2]);
				Key stockKey = new Key(outputKey.toString().getBytes());
				if(dividends.membershipTest(stockKey)) {
					outputValue.set(Double.parseDouble(words[6]));
					context.write(outputKey, outputValue);
				}
			}
		}
	}

	public static class StockFilterReducer extends Reducer<Stock, DoubleWritable, Text, DoubleWritable> {
		private String stockSymbol = "";
		private Text outputKey = new Text();

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			stockSymbol = context.getConfiguration().get("stockSymbol");
		}

		@Override
		protected void reduce(Stock key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
			//Check for a false positive
			if(!stockSymbol.equals(key.getSymbol())) {
				System.out.println("False positive: " + key.getSymbol());
			} else {
				while(values.iterator().hasNext()) {
					DoubleWritable closingPrice = values.iterator().next();
					outputKey.set(key.toString());
					context.write(outputKey, closingPrice);
				}
			}
		}

	}


	@Override
	public int run(String[] args) throws Exception {
		Job job1 = Job.getInstance(getConf(), "CreateBloomFilter");
		job1.setJarByClass(getClass());
		Configuration conf = job1.getConfiguration();
		conf.set("stockSymbol", args[0]);

		FileInputFormat.setInputPaths(job1, new Path("dividends"));
		
		job1.setMapperClass(BloomMapper.class);
		job1.setReducerClass(BloomReducer.class);
		job1.setInputFormatClass(TextInputFormat.class);
		job1.setOutputFormatClass(NullOutputFormat.class);
		job1.setMapOutputKeyClass(IntWritable.class);
		job1.setMapOutputValueClass(BloomFilter.class);
		job1.setOutputKeyClass(NullWritable.class);
		job1.setOutputValueClass(NullWritable.class);
		job1.setNumReduceTasks(1);
		
		boolean job1success = job1.waitForCompletion(true);
		if(!job1success) {
			System.out.println("The CreateBloomFilter job failed!");
			return -1;
		}

		Job job2 = Job.getInstance(conf, "FilterStocksJob");
		job2.setJarByClass(getClass());
		conf = job2.getConfiguration();

		Path out = new Path("bloomoutput");
		out.getFileSystem(conf).delete(out,true);
		FileInputFormat.setInputPaths(job2, new Path("stocks"));
		FileOutputFormat.setOutputPath(job2, out);
		
		job2.setMapperClass(StockFilterMapper.class);
		job2.setReducerClass(StockFilterReducer.class);
		job2.setInputFormatClass(TextInputFormat.class);
		job2.setOutputFormatClass(TextOutputFormat.class);
		job2.setMapOutputKeyClass(Stock.class);
		job2.setMapOutputValueClass(DoubleWritable.class);	
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(DoubleWritable.class);
				
		boolean job2success = job2.waitForCompletion(true);
		if(!job2success) {
			System.out.println("The FilterStocksJob failed!");
			return -1;
		}		
		return 1;
	}


	public static void main(String[] args) {
		int result = 0;
		try {
			result = ToolRunner.run(new Configuration(),  new StockDividendFilter(), args);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.exit(result);

	}

}
package bloom;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Stock implements WritableComparable<Stock> {

	private String symbol;
	private String date;
	private static final String COMMA = ",";
	
	@Override
	public boolean equals(Object obj) {
		if(obj instanceof Stock) {
			Stock other = (Stock) obj;
			if(symbol.equals(other.symbol) && date.equals(other.date)) {
				return true;
			}
		} 
		return false;
	}

	@Override
	public int hashCode() {
		return (symbol + date).hashCode();
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		symbol = in.readUTF();
		date = in.readUTF();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(symbol);
		out.writeUTF(date);
	}

	@Override
	public int compareTo(Stock arg0) {
		int response = this.symbol.compareTo(arg0.symbol);
		if(response == 0) {
			response = this.date.compareTo(arg0.date);
		}
		return response;
	}

	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

	public String getDate() {
		return date;
	}

	public void setDate(String date) {
		this.date = date;
	}

	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		sb.append(symbol).append(COMMA).append(date);
		return sb.toString();
	}
}
workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="dividendstockfilter-workflow">
	<start to="build-bloomfilter" />
	<action name="build-bloomfilter">
		<map-reduce>
			<job-tracker>${resourceManager}</job-tracker>
			<name-node>${nameNode}</name-node>
			<prepare>
				<delete path="${nameNode}/user/${wf:user()}/bloom/temp" />
			</prepare>
			<configuration>
				<property>
					<name>mapreduce.job.queuename</name>
					<value>${queueName}</value>
				</property>
				<property>
					<name>mapred.mapper.new-api</name>
					<value>true</value>
				</property>
				<property>
					<name>mapred.reducer.new-api</name>
					<value>true</value>
				</property>
				<property>
					<name>mapreduce.job.map.class</name>
					<value>bloom.StockDividendFilter$BloomMapper</value>
				</property>
				<property>
					<name>mapreduce.job.reduce.class</name>
					<value>bloom.StockDividendFilter$BloomReducer</value>
				</property>
				<property>
					<name>mapreduce.job.inputformat.class</name>
					<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat
					</value>
				</property>
				<property>
					<name>mapreduce.job.outputformat.class</name>
					<value>org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
					</value>
				</property>
				<property>
					<name>mapreduce.map.output.key.class</name>
					<value>org.apache.hadoop.io.IntWritable</value>
				</property>
				<property>
					<name>mapreduce.map.output.value.class</name>
					<value>org.apache.hadoop.util.bloom.BloomFilter</value>
				</property>
				<property>
					<name>mapreduce.job.output.key.class</name>
					<value>org.apache.hadoop.io.NullWritable</value>
				</property>
				<property>
					<name>mapreduce.job.output.value.class</name>
					<value>org.apache.hadoop.io.NullWritable</value>
				</property>
				<property>
					<name>mapreduce.job.reduces</name>
					<value>1</value>
				</property>
				<property>
					<name>mapreduce.input.fileinputformat.inputdir</name>
					<value>${nameNode}/user/${wf:user()}/bloom/dividends</value>
				</property>
				<property>
					<name>mapreduce.output.fileoutputformat.outputdir</name>
					<value>${nameNode}/user/${wf:user()}/bloom/temp</value>
				</property>
				<property>
					<name>stockSymbol</name>
					<value>${stockSymbol}</value>
				</property>
			</configuration>
		</map-reduce>
		<ok to="filter-stocks" />
		<error to="fail" />
	</action>
	<action name="filter-stocks">
		<map-reduce>
			<job-tracker>${resourceManager}</job-tracker>
			<name-node>${nameNode}</name-node>
			<prepare>
				<delete path="${nameNode}/user/${wf:user()}/bloom/bloomoutput" />
			</prepare>
			<configuration>
				<property>
					<name>mapreduce.job.queuename</name>
					<value>${queueName}</value>
				</property>
				<property>
					<name>mapred.mapper.new-api</name>
					<value>true</value>
				</property>
				<property>
					<name>mapred.reducer.new-api</name>
					<value>true</value>
				</property>
				<property>
					<name>mapreduce.job.map.class</name>
					<value>bloom.StockDividendFilter$StockFilterMapper</value>
				</property>
				<property>
					<name>mapreduce.job.reduce.class</name>
					<value>bloom.StockDividendFilter$StockFilterReducer</value>
				</property>
				<property>
					<name>mapreduce.job.inputformat.class</name>
					<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat
					</value>
				</property>
				<property>
					<name>mapreduce.job.outputformat.class</name>
					<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
					</value>
				</property>
				<property>
					<name>mapreduce.job.output.key.class</name>
					<value>bloom.Stock</value>
				</property>
				<property>
					<name>mapreduce.job.output.value.class</name>
					<value>org.apache.hadoop.io.DoubleWritable</value>
				</property>
				<property>
					<name>mapreduce.job.reduces</name>
					<value>1</value>
				</property>
				<property>
					<name>mapreduce.output.fileoutputformat.outputdir</name>
					<value>${nameNode}/user/${wf:user()}/bloom/bloomoutput</value>
				</property>
				<property>
					<name>mapreduce.input.fileinputformat.inputdir</name>
					<value>${nameNode}/user/${wf:user()}/bloom/stocks</value>
				</property>
				<property>
					<name>stockSymbol</name>
					<value>${stockSymbol}</value>
				</property>
			</configuration>
		</map-reduce>
		<ok to="end" />
		<error to="fail" />
	</action>
	<kill name="fail">
		<message>Job failed, error
			message[${wf:errorMessage(wf:lastErrorNode())}]</message>
	</kill>
	<end name="end" />
</workflow-app>
job.properties

# hadoop fs -mkdir bloom

# hadoop fs -mkdirbloom/lib

# hadoop fs -mkdir bloom/dividends

# hadoop fs -put ~/java/labs/data/stock_dividends/NYSE_dividends_A.csvbloom/dividends

# hadoop fs -mkdir bloom/stocks

# hadoop fs -put~/java/labs/data/stock_prices/NYSE_daily_prices_A.csv bloom/stocks

# oozie job -config job.properties -run