1. 程式人生 > >《資料演算法-Hadoop/Spark大資料處理技巧》讀書筆記(四)——移動平均

《資料演算法-Hadoop/Spark大資料處理技巧》讀書筆記(四)——移動平均

移動平均:對時序序列按週期取其值的平均值,這種運算被稱為移動平均。典型例子是求股票的n天內的平均值。
移動平均的關鍵是如何求這個平均值,可以使用Queue來實現。

public class MovingAverageDriver {
    public static void main(String[] args){
        SparkConf conf = new SparkConf().setAppName("MovingAverageDriver");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("hdfs://hadoop000:8020/dataalgorithminput/MovingAverage.txt"
); List<String> list = lines.mapToPair(new PairFunction<String, CodeDatePair, DatePricePair>() { @Override public Tuple2<CodeDatePair, DatePricePair> call(String line) throws Exception { String[] array = line.split(","); return
new Tuple2<CodeDatePair, DatePricePair>(new CodeDatePair(array[0],array[1]),new DatePricePair(array[1],array[2])); } }).repartitionAndSortWithinPartitions(new CodeDatePartitioner()) .mapToPair(new PairFunction<Tuple2<CodeDatePair,DatePricePair>, String, DatePricePair>() { @Override
public Tuple2<String, DatePricePair> call(Tuple2<CodeDatePair, DatePricePair> tuple2) throws Exception { return new Tuple2<String, DatePricePair>(tuple2._1().getCode(), tuple2._2()); } }) .groupByKey() .flatMapValues(new Function<Iterable<DatePricePair>, Iterable<Tuple2<DatePricePair, Double>>>(){ @Override public Iterable<Tuple2<DatePricePair, Double>> call(Iterable<DatePricePair> v1) throws Exception { int size = 3; List<Tuple2<DatePricePair, Double>> list = new ArrayList<Tuple2<DatePricePair, Double>>(); Queue<Integer> queue= new LinkedList<Integer>(); for(DatePricePair item : v1){ queue.offer(item.getPrice()); if(queue.size() > size){ queue.remove(); } int sum = 0; for(Integer price : queue){ sum += price; } list.add(new Tuple2<DatePricePair, Double>(item, Double.valueOf(sum)/Double.valueOf(queue.size()))); } return list; } }).map(new Function<Tuple2<String,Tuple2<DatePricePair,Double>>, String>() { @Override public String call(Tuple2<String, Tuple2<DatePricePair, Double>> v1) throws Exception { return v1._1() + " " + v1._2()._1().getDate() + " " + v1._2()._1().getPrice() + " " + v1._2()._2(); } }).collect(); for(String item : list){ System.out.println(item); } } }

①對一隻股票按照date來排序
②對資料集根據股票的Code進行groupByKey操作,生成的value是按照時間增大順序排列的date-price對
③在flatMapValues()裡使用Queue實現對連續3天的股價求平均值,並展開。