1. 程式人生 > >針對微信的一篇推送附有的數據鏈接進行MapReduce統計

針對微信的一篇推送附有的數據鏈接進行MapReduce統計

全球 tco 大數據 cer 推送 xtend .get ati 適用於

原推送引用:https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg

版權歸原作者所有,如有侵權請及時聯系本人,見諒!

原文采用Excel進行統計數據,這裏采用剛學習的工具進行練習。

  1 import java.io.IOException;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.conf.Configured;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.IntWritable;
7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.util.Tool; 15 import org.apache.hadoop.util.ToolRunner; 16 17 /** 18 * https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg 19 * 針對[新興生態系統:Python和R語言,誰更適用於大數據Spark/Hadoop和深度學習?] 20 * 的全球數據進行一系列統計 21 */ 22 public class wechat extends Configured implements Tool { 23 24
/** 25 * Map方法 26 */ 27 private static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 28 private static final IntWritable mapOutputValue = new IntWritable(1) ; 29 private Text mapOutputKey = new Text() ; 30 @Override 31 public void map(LongWritable key, Text value, Context context) 32 throws IOException, InterruptedException { 33 34 String input = value.toString(); 35 if(input.split(",").length<16) { 36 return; 37 } 38 String[] arrStr = input.split(","); 39 //Python-大數據計數器輸出 40 if("1".equals(arrStr[2])&&"1".equals(arrStr[14])) { 41 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_BigData").increment(1L); 42 } 43 //Python-Deep計數器輸出 44 if("1".equals(arrStr[2])&&"1".equals(arrStr[13])) { 45 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_Deep-Learning").increment(1L); 46 } 47 //R-大數據計數器輸出 48 if("1".equals(arrStr[3])&&"1".equals(arrStr[14])) { 49 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_BigData").increment(1L); 50 } 51 //R-深度計數器輸出 52 if("1".equals(arrStr[3])&&"1".equals(arrStr[13])) { 53 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_Deep-Learning").increment(1L); 54 } 55 56 arrStr = input.split(",")[16].split(";"); 57 //遍歷 58 for(String tool: arrStr){ 59 // 設置key 60 mapOutputKey.set(tool); 61 // 輸出 62 context.write(mapOutputKey, mapOutputValue) ; 63 } 64 } 65 } 66 67 /** 68 * Reduce聚合結果 69 */ 70 private static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 71 private IntWritable outputValue = new IntWritable() ; 72 @Override 73 protected void reduce(Text key, Iterable<IntWritable> values, Context context) 74 throws IOException, InterruptedException { 75 76 // 定義臨時變量,用於累加 77 int sum = 0 ; 78 79 // 遍歷 80 for(IntWritable value: values){ 81 sum += value.get() ; 82 } 83 84 if(sum < 500){ 85 // 定義500以上的篩選 86 return ; 87 } 88 // 設置 89 outputValue.set(sum) ; 90 // 輸出 91 context.write(key, outputValue) ; 92 93 } 94 } 95 96 /** 97 * 驅動創建Job並提交運行 返回狀態碼 98 */ 99 100 public int run(String[] args) throws Exception { 101 // 創建一個Job 102 Job job = Job.getInstance( 103 this.getConf() , wechat.class.getSimpleName() 104 ) ; 105 // 設置job運行的class 106 job.setJarByClass(wechat.class); 107 108 // 設置Job 109 // 1. 設置 input,從哪裏讀取數據 110 Path inputPath = new Path(args[0]) ; 111 FileInputFormat.addInputPath(job, inputPath); 112 113 // 2. 設置 mapper類 114 job.setMapperClass(ModuleMapper.class); 115 // 設置map 輸出的key和value的數據類型 116 job.setMapOutputKeyClass(Text.class); 117 job.setMapOutputValueClass(IntWritable.class); 118 119 // 3. 設置 reducer 類 120 job.setReducerClass(ModuleReducer.class); 121 // 設置 reducer 輸出的key和value的數據類型 122 job.setOutputKeyClass(Text.class); 123 job.setOutputValueClass(IntWritable.class); 124 // 設置ReduceTask個數 125 // job.setNumReduceTasks(2); 126 127 // 4. 設置 處理結果保存的路徑 128 Path outputPath = new Path(args[1]) ; 129 FileOutputFormat.setOutputPath(job, outputPath); 130 131 // 提交job運行 132 boolean isSuccess = job.waitForCompletion(true) ; 133 134 // 返回狀態 135 return isSuccess ? 0 : 1; 136 } 137 138 /** 139 * 140 * @param args 141 * @throws Exception 142 */ 143 public static void main(String[] args) throws Exception { 144 if(2 > args.length){ 145 System.out.println("Usage: " + wechat.class.getSimpleName() +" <in> <out>"); 146 return ; 147 } 148 149 // 讀取HADOOP中配置文件, core-*.xml hdfs-*.xml yarn-*.xml mapred-*.xml 150 Configuration conf = new Configuration() ; 151 152 // 運行Job 153 int status = ToolRunner.run(conf, new wechat(), args) ; 154 155 // exit program 156 System.exit(status); 157 } 158 159 }

針對微信的一篇推送附有的數據鏈接進行MapReduce統計