關於大資料統計大量股票開盤平均價和收盤價的簡單程式程式
阿新 • • 發佈:2018-11-10
1.需要統計的檔案
2.單個檔案的內容
3.程式內容
package com.test4; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class CodeX { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); String[] otherArgs = (new GenericOptionsParser(conf,args)).getRemainingArgs(); if(otherArgs.length<2){ System.err.println("Usage:CodeX<in><out>"); System.exit(2); } Job job = Job.getInstance(conf,"CodeX"); job.setJarByClass(CodeX.class); job.setMapperClass(CodeX.Map.class); System.out.println("Mapper over"); // job.setCombinerClass(CodeX.Reduce.class); job.setReducerClass(CodeX.Reduce.class); System.out.println("Reduce over"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.out.println("all over"); for(int i = 0;i<otherArgs.length-1;i++){ FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); } public static class Map extends Mapper<Object,Text,Text,Text>{ private Text text = new Text(); private Text keys = new Text(); private int no = 0; public Map(){ } public void map(Object key,Text value,Context context)throws IOException,InterruptedException{ String line = value.toString(); this.no +=1; System.out.println(this.no+line); String[] lines = line.split("\\s+"); for(int i =0;i<lines.length;i++){ System.out.print(lines[i]+" ~~"); } if(this.no == 1){ this.keys.set("股票編碼:"+lines[0]); } if(this.no > 2){ if(lines.length == 7){ this.text.set(lines[0]+"+"+lines[1]+"+"+lines[4]); System.out.println(this.no+"---->"+lines[0]+"+"+lines[1]+"+"+lines[4]); context.write(this.keys, this.text); } } } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ private Text text = new Text(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ double sum1 = 0.0; double sum2 = 0.0; int n = 0; System.out.println("...................start"+key.toString()); Iterator<Text> $it = values.iterator(); while($it.hasNext()){ String record =$it.next().toString(); System.out.println(n); System.out.println("原始資料:"+record); n++; System.out.println("第"+n+"次迴圈"); String []result = record.split("[+]"); System.out.println(Double.valueOf(result[1])+" "+Double.valueOf(result[2])); sum1 +=(Double.valueOf(result[1])*100); sum2 +=(Double.valueOf(result[2])*100); System.out.println(sum1/100+" "+sum2/100); } System.out.println("最後的結果:"+sum1/100+" "+sum2/100); double openPrise = sum1/(100*n); double closePrise = sum2/(100*n); openPrise = (double)Math.round(openPrise*100)/100; closePrise = (double)Math.round(closePrise*100)/100; System.out.println("平均值:"+openPrise+" "+closePrise); Double.toString(closePrise); String result ="開盤平均價:"+Double.toString(openPrise)+", 收盤平均價:"+Double.toString(closePrise); this.text.set(result); context.write(key, this.text); } } }