1. 程式人生 > >【原創】MapReduce實戰(一)

【原創】MapReduce實戰(一)

tid refs 讀取 sel instance 網站 let 創建 -c

應用場景:

用戶每天會在網站上產生各種各樣的行為,比如瀏覽網頁,下單等,這種行為會被網站記錄下來,形成用戶行為日誌,並存儲在hdfs上。格式如下:

17:03:35.012?pageview?{"device_id":"4405c39e85274857bbef58e013a08859","user_id":"0921528165741295","ip":"61.53.69.195","session_id":"9d6dc377216249e4a8f33a44eef7576d","req_url":"http://www.bigdataclass.com/product/1527235438747427"}

這是一個類Json 的非結構化數據,主要內容是用戶訪問網站留下的數據,該文本有device_id,user_id,ip,session_id,req_url等屬性,前面還有17:03:20.586?pageview?,這些非結構化的數據,我們想把該文本通過mr程序處理成被數倉所能讀取的格式,比如Json串形式輸出,具體形式如下:

{"time_log":1527584600586,"device_id":"4405c39e85274857bbef58e013a08859","user_id":"0921528165741295","active_name":"pageview","ip":"61.53.69.195","session_id":"9d6dc377216249e4a8f33a44eef7576d","req_url":"http://www.bigdataclass.com/my/0921528165741295"}

代碼工具:intellij idea, maven,jdk1.8

操作步驟

  1. 配置 pom.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion
> 6 7 <groupId>netease.bigdata.course</groupId> 8 <artifactId>etl</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <dependencies> 12 <dependency> 13 <groupId>org.apache.hadoop</groupId> 14 <artifactId>hadoop-client</artifactId> 15 <version>2.7.6</version> 16 <scope>provided</scope> 17 </dependency> 18 <dependency> 19 <groupId>com.alibaba</groupId> 20 <artifactId>fastjson</artifactId> 21 <version>1.2.4</version> 22 </dependency> 23 </dependencies> 24 25 <build> 26 <sourceDirectory>src/main</sourceDirectory> 27 <plugins> 28 <plugin> 29 <groupId>org.apache.maven.plugins</groupId> 30 <artifactId>maven-assembly-plugin</artifactId> 31 <configuration> 32 <descriptorRefs> 33 <descriptorRef> 34 jar-with-dependencies 35 </descriptorRef> 36 </descriptorRefs> 37 </configuration> 38 <executions> 39 <execution> 40 <id>make-assembly</id> 41 <phase>package</phase> 42 <goals> 43 <goal>single</goal> 44 </goals> 45 </execution> 46 </executions> 47 </plugin> 48 49 </plugins> 50 </build> 51 52 </project>

2.編寫主類這裏為了簡化代碼量,我將方法類和執行類都寫在ParseLogJob.java類中

package com.bigdata.etl.job;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

public class ParseLogJob extends Configured implements Tool {
//日誌解析函數 (輸入每一行的值)
public static Text parseLog(String row) throws ParseException {
String[] logPart = StringUtils.split(row, "\u1111");
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
long timeLog = dateFormat.parse(logPart[0]).getTime();
String activeName = logPart[1];
JSONObject bizData=JSONObject.parseObject(logPart[2]);
JSONObject logData = new JSONObject();

logData.put("active_name",activeName);
logData.put("time_log",timeLog);
logData.putAll(bizData);
return new Text(logData.toJSONString());
}


//輸入key類型,輸入value類型,輸出。。(序列化類型)
public static class LogMapper extends Mapper<LongWritable,Text,NullWritable,Text>{
//輸入key值 輸入value值 map運行的上下文變量
public void map(LongWritable key ,Text value ,Context context) throws IOException,InterruptedException{
try {
Text parseLog = parseLog(value.toString());
context.write(null,parseLog);
} catch (ParseException e) {
e.printStackTrace();
}

}
}

public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration config = getConf();
Job job= Job.getInstance(config);
job.setJarByClass(ParseLogJob.class);
job.setJobName("parseLog");
job.setMapperClass(LogMapper.class);
//設置reduce 為0
job.setNumReduceTasks(0);
//命令行第一個參數作為輸入路徑
FileInputFormat.addInputPath(job,new Path(args[0]));
//第二個參數 輸出路徑
Path outPutPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job,outPutPath);
//防止報錯 刪除輸出路徑
FileSystem fs = FileSystem.get(config);
if (fs.exists(outPutPath)){
fs.delete(outPutPath,true);
}
if (!job.waitForCompletion(true)){
throw new RuntimeException(job.getJobName()+"fail");
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ParseLogJob(), args);
System.exit(res);
}
}

3.打包上傳到服務器

技術分享圖片

技術分享圖片

4.執行程序

我們在hdfs 中創建了input和output做為輸入輸出路徑

hadoop jar ./etl-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.etl.job.ParseLogJob /user/1141690160/input /user/1141690160/output

技術分享圖片

程序已經map完,因為我們沒有對reduce進行操作,所以reduce為0

技術分享圖片

去hdfs 查看一下我們map完的文件

技術分享圖片

技術分享圖片

至此,一個簡單的mr程序跑完了。

【原創】MapReduce實戰(一)