大數據模塊開發之數據預處理
阿新 • • 發佈:2018-09-26
exce ews map 詳細 clas cas stream type repr 1. 主要目的
過濾“不合規”數據,清洗無意義的數據
格式轉換和規整
根據後續的統計需求,過濾分離出各種不同主題(不同欄目path)的基礎數據。
2. 實現方式
開發一個mr程序WeblogPreProcess(內容太長,見工程代碼) 直接從原始數據中用hql語法得出每個人的“次”訪問信息比較困難,可先用mapreduce程序分析原始數據得出“次”信息數據,然後再用hql進行更多維度統計
用MR程序從pageviews數據中,梳理出每一次visit的起止時間、頁面信息
詳細代碼見工程:ClickStreamVisit.java
過濾“不合規”數據,清洗無意義的數據
格式轉換和規整
根據後續的統計需求,過濾分離出各種不同主題(不同欄目path)的基礎數據。
2. 實現方式
開發一個mr程序WeblogPreProcess(內容太長,見工程代碼)
public class WeblogPreProcess { static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text k = new Text(); NullWritable v = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); // WebLogBean productWebLog = WebLogParser.parser2(line); // WebLogBean bbsWebLog = WebLogParser.parser3(line); // WebLogBean cuxiaoBean = WebLogParser.parser4(line); if (!webLogBean.isValid()) return; k.set(webLogBean.toString()); context.write(k, v); // k.set(productWebLog); // context.write(k, v); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WeblogPreProcess.class); job.setMapperClass(WeblogPreProcessMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
l 運行mr對數據進行預處理
hadoop jar weblog.jar cn.itcast.bigdata.hive.mr.WeblogPreProcess /weblog/input /weblog/preout
3. 點擊流模型數據梳理
由於大量的指標統計從點擊流模型中更容易得出,所以在預處理階段,可以使用mr程序來生成點擊流模型的數據。
3.1. 點擊流模型pageviews表
Pageviews表模型數據生成, 詳細見:ClickStreamPageView.java
此時程序的輸入數據源就是上一步驟我們預處理完的數據。經過此不處理完成之後的數據格式為:
3.2. 點擊流模型visit信息表
註:“一次訪問”=“N次連續請求”
用MR程序從pageviews數據中,梳理出每一次visit的起止時間、頁面信息
詳細代碼見工程:ClickStreamVisit.java
大數據模塊開發之數據預處理