Spark自定義累加器的實現需要注意的細節(java版)
阿新 • • 發佈:2018-12-21
可以參考下面博文
!!!!!!
需要注意的是 ,原始碼中給出
也就是說兩個方法的實現是不一樣的。
下面是我的實現
import constant.Constants; import org.apache.spark.AccumulatorParam; import util.StringUtils; public class TimeAccumulator implements AccumulatorParam<String> { @Override public String addAccumulator(String s1, String s2) { //校驗:v2位空的話,直接返回v1 if(StringUtils.isEmpty(s2)) { return s1; } // 使用StringUtils工具類,從v1中,提取v2對應的值,並累加 String oldValue = StringUtils.getFieldFromConcatString(s1, "\\|", s2); if(oldValue != null) { int newValue = Integer.valueOf(oldValue) + 1; return StringUtils.setFieldInConcatString(s1, "\\|", s2, String.valueOf(newValue)); } return s1; } @Override public String addInPlace(String r1, String r2) { //校驗:v1位空的話,直接返回v2 if(StringUtils.isEmpty(r1)) { return r2; } // 使用StringUtils工具類,從v1 v2對應的值,並累加 int oldValue1 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_1s_3s)); int oldValue2 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_4s_6s)); int oldValue3 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_7s_9s)); int oldValue4 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_10s_30s)); int oldValue5 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_30s_60s)); int oldValue6 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_1m_3m)); int oldValue7 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_3m_10m)); int oldValue8 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_10m_30m)); int oldValue9 = Integer.parseInt(StringUtils.getFieldFromConcatString(r1,"\\|",Constants.S_30m)); int newValue1 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_1s_3s)); int newValue2 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_4s_6s)); int newValue3 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_7s_9s)); int newValue4 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_10s_30s)); int newValue5 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_30s_60s)); int newValue6 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_1m_3m)); int newValue7 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_3m_10m)); int newValue8 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_10m_30m)); int newValue9 = Integer.parseInt(StringUtils.getFieldFromConcatString(r2,"\\|",Constants.S_30m)); return Constants.S_1s_3s+(oldValue1+newValue1)+Constants.S_4s_6s+(oldValue2+newValue2)+Constants.S_7s_9s+(oldValue3+newValue3)+ Constants.S_10s_30s+(oldValue4+newValue4)+Constants.S_30s_60s+(oldValue5+newValue5)+Constants.S_1m_3m+(oldValue6+newValue6)+ Constants.S_3m_10m+(oldValue7+newValue7)+Constants.S_10m_30m+(oldValue8+newValue8)+Constants.S_30m+(oldValue9+newValue9); } @Override public String zero(String initialValue) { return Constants.S_1s_3s+"=0|"+Constants.S_4s_6s+"=0|"+Constants.S_7s_9s+"=0|"+ Constants.S_10s_30s+"=0|"+Constants.S_30s_60s+"=0|"+Constants.S_1m_3m+"=0|"+ Constants.S_3m_10m+"=0|"+Constants.S_10m_30m+"=0|"+Constants.S_30m+"=0"; } }