1. 程式人生 > >茄子快傳數據分析之原理分析及數據清洗

茄子快傳數據分析之原理分析及數據清洗

lse == 信息丟失 參數 format 手機 打印 客戶 mapred

茄子快傳數據分析之原理分析及數據清洗

版權聲明:聞道有先後,術業有專攻。 https://blog.csdn.net/wlk_328909605/article/details/82227410

需求:聯想集團有一款app產品叫茄子快傳(有上億的活躍用戶,集中在第三世界國家)
現在需要開發一個數據分析系統,來對app的用戶行為數據做各類分析;

原理:
流程如下圖:
技術分享圖片
流程簡單介紹:
用戶通過茄子的客戶端產生數據,
將使用時間,手機號,ip地址,手機的序列號,app的版本,app的下載渠道等重要信息上傳到聯想的web日誌服務器上,服務器的後臺系統打印出日誌文件,通過flume(一種日誌采集工具)將生成的日誌上傳到hdfs上,先進行數據清洗,將版本,渠道,用戶等重要信息丟失的過濾掉,生成新的文件,數據加載到hive中,進行運算處理,處理後的結果通過sqoop(一種數據遷移工具)保存到關系型數據庫中,比如MySql,再通過web服務器,將分析出的結果顯示到瀏覽器上。

預處理需求(mapreduce):
1/ 請對app事件請求日誌進行預處理:
a) 過濾掉一些不合法數據(缺失device_id,app_ver_name,os_name,app_token,city,release_channel字段需要過濾)
b) 將原格式json,解析成csv(逗號分隔的文本)格式,並去掉”events”字段
c) 在原始數據中,追加一個字段user_id(如果是蘋果,就用device_id,如果是android,就用android_id)
數據預處理的時候,只需要map就可以完成,所以就不需要reduce了。
處理要求:device_id,app_ver_name,os_name,app_token,city,release_channel 缺失則過濾
代碼如下:

package com.cleanLog;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;


public class AppLogClean {


    public static class MapTask extends Mapper<LongWritable, Text, Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            ObjectMapper mapper = new ObjectMapper();
            JsonNode log = mapper.readTree(line);
            JsonNode header = log.get("header");

            if(StringUtils.isBlank(header.get("device_id").getTextValue())||
                    //也可以直接getString()
                    StringUtils.isBlank(header.get("app_ver_name").getTextValue())||
                    StringUtils.isBlank(header.get("os_name").getTextValue())||
                    StringUtils.isBlank(header.get("app_token").getTextValue())||
                    StringUtils.isBlank(header.get("city").getTextValue())||
                    StringUtils.isBlank(header.get("release_channel").getTextValue())) {
                return;
            }else {
                String user_id = "";
                if (header.get("device_id_type").getTextValue().equals("mac")) {
                    user_id = header.get("device_id").getTextValue();
                } else {
                    user_id = header.get("android_id").getTextValue();
                }

                StringBuilder sb = new StringBuilder();
                sb.append(header.get("cid_sn").getTextValue()).append(",");
                sb.append(header.get("mobile_data_type").getTextValue()).append(",");
                sb.append(header.get("os_ver").getTextValue()).append(",");
                sb.append(header.get("mac").getTextValue()).append(",");
                sb.append(header.get("resolution").getTextValue()).append(",");
                sb.append(header.get("commit_time").getTextValue()).append(",");
                sb.append(header.get("sdk_ver").getTextValue()).append(",");
                sb.append(header.get("device_id_type").getTextValue()).append(",");
                sb.append(header.get("city").getTextValue()).append(",");
                sb.append(header.get("android_id").getTextValue()).append(",");
                sb.append(header.get("device_model").getTextValue()).append(",");
                sb.append(header.get("carrier").getTextValue()).append(",");
                sb.append(header.get("promotion_channel").getTextValue()).append(",");
                sb.append(header.get("app_ver_name").getTextValue()).append(",");
                sb.append(header.get("imei").getTextValue()).append(",");
                sb.append(header.get("app_ver_code").getTextValue()).append(",");
                sb.append(header.get("pid").getTextValue()).append(",");
                sb.append(header.get("net_type").getTextValue()).append(",");
                sb.append(header.get("device_id").getTextValue()).append(",");
                sb.append(header.get("app_device_id").getTextValue()).append(",");
                sb.append(header.get("release_channel").getTextValue()).append(",");
                sb.append(header.get("country").getTextValue()).append(",");
                sb.append(header.get("time_zone").getTextValue()).append(",");
                sb.append(header.get("os_name").getTextValue()).append(",");
                sb.append(header.get("manufacture").getTextValue()).append(",");
                sb.append(header.get("commit_id").getTextValue()).append(",");
                sb.append(header.get("app_token").getTextValue()).append(",");
                sb.append(header.get("account").getTextValue()).append(",");
                sb.append(header.get("app_id").getTextValue()).append(",");
                sb.append(header.get("build_num").getTextValue()).append(",");
                sb.append(header.get("language").getTextValue()).append(",");
                sb.append(user_id);

                context.write(new Text(sb.toString()), NullWritable.get());
            }

        }


        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(AppLogClean.class);
            job.setMapperClass(MapTask.class);
            job.setOutputKeyClass(Text.class);

            //設置reduce的數量為0
            job.setNumReduceTasks(0);

            FileInputFormat.addInputPath(job, new Path("D:\\data\\appuserdata\\input\\20170102"));//這裏可以設置成參數args[0]
            FileOutputFormat.setOutputPath(job, new Path("D:\\data\\appuserdata\\output\\20170102"));

            boolean completion = job.waitForCompletion(true);//提交的時候也可以是submit,只不過這個是能看到過程。

            System.out.println(completion?"成功":"失敗");

        }
    }
}

這樣,數據就簡單的清理了,只需要將生成的文件再放到集群上就可以用hive進行處理了。

茄子快傳數據分析之原理分析及數據清洗