1. 程式人生 > >Spark自定義累加器的實現需要注意的細節(java版)

Spark自定義累加器的實現需要注意的細節(java版)

可以參考下面博文

!!!!!!

需要注意的是 ,原始碼中給出

也就是說兩個方法的實現是不一樣的。

下面是我的實現

import constant.Constants;
import org.apache.spark.AccumulatorParam;
import util.StringUtils;

public class TimeAccumulator implements AccumulatorParam<String> {

    @Override
    public String addAccumulator(String s1, String s2) {
        //校驗:v2位空的話,直接返回v1
        if(StringUtils.isEmpty(s2)) {
            return s1;
        }
        // 使用StringUtils工具類,從v1中,提取v2對應的值,並累加
        String oldValue = StringUtils.getFieldFromConcatString(s1, "\\|", s2);
        if(oldValue != null) {
            int newValue = Integer.valueOf(oldValue) + 1;
            return StringUtils.setFieldInConcatString(s1, "\\|", s2, String.valueOf(newValue));
        }
        return s1;
    }

    @Override
    public String addInPlace(String r1, String r2) {
        //校驗:v1位空的話,直接返回v2
        if(StringUtils.isEmpty(r1)) {
            return r2;
        }
        // 使用StringUtils工具類,從v1 v2對應的值,並累加
        int oldValue1 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_1s_3s));
        int oldValue2 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_4s_6s));
        int oldValue3 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_7s_9s));
        int oldValue4 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_10s_30s));
        int oldValue5 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_30s_60s));
        int oldValue6 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_1m_3m));
        int oldValue7 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_3m_10m));
        int oldValue8 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_10m_30m));
        int oldValue9 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_30m));

        int newValue1 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_1s_3s));
        int newValue2 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_4s_6s));
        int newValue3 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_7s_9s));
        int newValue4 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_10s_30s));
        int newValue5 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_30s_60s));
        int newValue6 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_1m_3m));
        int newValue7 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_3m_10m));
        int newValue8 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_10m_30m));
        int newValue9 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_30m));

        return Constants.S_1s_3s+(oldValue1+newValue1)+Constants.S_4s_6s+(oldValue2+newValue2)+Constants.S_7s_9s+(oldValue3+newValue3)+
                Constants.S_10s_30s+(oldValue4+newValue4)+Constants.S_30s_60s+(oldValue5+newValue5)+Constants.S_1m_3m+(oldValue6+newValue6)+
                Constants.S_3m_10m+(oldValue7+newValue7)+Constants.S_10m_30m+(oldValue8+newValue8)+Constants.S_30m+(oldValue9+newValue9);
    }


    @Override
    public String zero(String initialValue) {
        return Constants.S_1s_3s+"=0|"+Constants.S_4s_6s+"=0|"+Constants.S_7s_9s+"=0|"+
                Constants.S_10s_30s+"=0|"+Constants.S_30s_60s+"=0|"+Constants.S_1m_3m+"=0|"+
                Constants.S_3m_10m+"=0|"+Constants.S_10m_30m+"=0|"+Constants.S_30m+"=0";
    }
}