1. 程式人生 > >Flume+Hadoop+Hive的離線分析系統基本架構

Flume+Hadoop+Hive的離線分析系統基本架構

      需要注意的是Flume的Source在本文的系統中選擇的是Spooling Directory Source,而沒有選擇Exec Source,因為當Flume服務down掉的時候Spooling Directory Source能記錄上一次讀取到的位置,而Exec Source則沒有,需要使用者自己去處理,當重啟Flume伺服器的時候如果處理不好就會有重複資料的問題。當然Spooling Directory Source也是有缺點的,會對讀取過的檔案重新命名,所以多架一層FTP伺服器也是為了避免Flume“汙染”生產環境。Spooling Directory Source
另外一個比較大的缺點就是無法做到靈活
監聽某個資料夾底下所有子資料夾裡的所有檔案裡新追加的內容。關於這些問題的解決方案也有很多,比如選擇其它的日誌採集工具,像logstash等。

       FTP
伺服器上的Flume配置檔案如下:
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = spooldir
agent.sources.origin.spoolDir = /export/data/trivial/weblogs
agent.sources.origin.channels = memorychannel
agent.sources.origin.deserializer.maxLineLength = 2048

agent.sources.origin.interceptors = i2
agent.sources.origin.interceptors.i2.type = host
agent.sources.origin.interceptors.i2.hostHeader = hostname

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000

agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545
     這裡有幾個引數需要說明,Flume Agent Source可以通過配置deserializer.maxLineLength這個屬性來指定每個Event的大小,預設是每個Event2048byteFlume Agent Channel的大小預設等於於本地伺服器上JVM所獲取到的記憶體的80%,使用者可以通過byteCapacityBufferPercentagebyteCapacity兩個引數去進行優化。需要特別注意的是FTP上放入Flume監聽的資料夾中的日誌檔案不能同名,不然Flume會報錯並停止工作,最好的解決方案就是為每份日誌檔案拼上時間戳。

     Hadoop
伺服器上的配置檔案如下:
agent.sources = origin
agent.channels = memorychannel
agent.sinks = target

agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545

#agent.sources.origin.interceptors = i1 i2
#agent.sources.origin.interceptors.i1.type = timestamp
#agent.sources.origin.interceptors.i2.type = host
#agent.sources.origin.interceptors.i2.hostHeader = hostname

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel

agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000

agent.sinks.target.type = hdfs
agent.sinks.target.channel = memorychannel
agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S
agent.sinks.target.hdfs.filePrefix = data-%{hostname}
agent.sinks.target.hdfs.rollInterval = 60
agent.sinks.target.hdfs.rollSize = 1073741824
agent.sinks.target.hdfs.rollCount = 1000000
agent.sinks.target.hdfs.round = true
agent.sinks.target.hdfs.roundValue = 10
agent.sinks.target.hdfs.roundUnit = minute
agent.sinks.target.hdfs.useLocalTimeStamp = true
agent.sinks.target.hdfs.minBlockReplicas=1
agent.sinks.target.hdfs.writeFormat=Text
agent.sinks.target.hdfs.fileType=DataStream

round, roundValue,roundUnit三個引數是用來配置每10分鐘在hdfs裡生成一個資料夾儲存從FTP伺服器上拉取下來的資料。

    Troubleshooting        使用Flume拉取檔案到HDFS中會遇到將檔案分散成多個1KB-5KB的小檔案的問題        需要注意的是如果遇到Flume會將拉取過來的檔案分成很多份1KB-5KB的小檔案儲存到HDFS上,那麼很可能是HDFS Sink的配置不正確,導致系統使用了預設配置。spooldir型別的source是將指定目錄中的檔案的每一行封裝成一個event放入到channel中,預設每一行最大讀取1024個字元。在HDFS Sink端主要是通過rollInterval(預設30秒), rollSize(預設1KB), rollCount(預設10個event)3個屬性來決定寫進HDFS的分片檔案的大小。rollInterval表示經過多少秒後就將當前.tmp檔案(寫入的是從channel中過來的events)下沉到HDFS檔案系統中,rollSize表示一旦.tmp檔案達到一定的size後,就下沉到HDFS檔案系統中,rollCount表示.tmp檔案一旦寫入了指定數量的events就下沉到HDFS檔案系統中。        使用Flume拉取到HDFS中的檔案格式錯亂        這是因為HDFS Sink的配置中,hdfs.writeFormat屬性預設為“Writable”會將原先的檔案的內容序列化成HDFS的格式,應該手動設定成hdfs.writeFormat=“text”; 並且hdfs.fileType預設是“SequenceFile”型別的,是將所有event拼成一行,應該該手動設定成hdfs.fileType=“DataStream”,這樣就可以是一行一個event,與原檔案格式保持一致

使用Mapreduce清洗日誌檔案 當把日誌檔案中的資料拉取到HDFS檔案系統後,使用Mapreduce程式去進行日誌清洗 第一步,先用Mapreduce過濾掉無效的資料
package com.guludada.clickstream;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.dataparser.WebLogParser;


public class logClean  {
	
	public static class cleanMap extends Mapper<Object,Text,Text,NullWritable> {
		
		private NullWritable v = NullWritable.get();
		private Text word = new Text();
		WebLogParser webLogParser = new WebLogParser();
		
		public void map(Object key,Text value,Context context) {
					
			//將一行內容轉成string
			 String line = value.toString();
			
			 String cleanContent = webLogParser.parser(line);
			
			 if(cleanContent != "") { 		
				word.set(cleanContent);
				try {
					context.write(word,v);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}			
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
	
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
		
		Job job = Job.getInstance(conf);
				
		job.setJarByClass(logClean.class);
		
		//指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(cleanMap.class);
				
		//指定mapper輸出資料的kv型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
			
		//指定job的輸入原始檔案所在目錄
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		FileInputFormat.setInputPaths(job, new Path("/flume/events/" + dateStr + "/*/*"));

		//指定job的輸出結果所在目錄
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/cleandata/"+dateStr+"/"));
		
		//將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	 }

}

package com.guludada.dataparser;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.guludada.javabean.WebLogBean;
/**
*   用正則表示式匹配出合法的日誌記錄
*
*
*/
public class WebLogParser {
	
   public String parser(String weblog_origin) {
		
	 WebLogBean weblogbean = new WebLogBean();
		
        // 獲取IP地址
	Pattern IPPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+");
        Matcher IPMatcher = IPPattern.matcher(weblog_origin);
	if(IPMatcher.find()) {
	   String IPAddr = IPMatcher.group(0);
	   weblogbean.setIP_addr(IPAddr);
        } else {
	   return ""
        }
	 // 獲取時間資訊
	 Pattern TimePattern = Pattern.compile("\\[(.+)\\]");
	 Matcher TimeMatcher = TimePattern.matcher(weblog_origin);
	 if(TimeMatcher.find()) {
	   String time = TimeMatcher.group(1);
	   String[] cleanTime = time.split(" ");
	   weblogbean.setTime(cleanTime[0]);
	 } else {
	   return "";
	 }
		
        //獲取其餘請求資訊
	 Pattern InfoPattern = Pattern.compile(
		 "(\\\"[POST|GET].+?\\\") (\\d+) (\\d+).+?(\\\".+?\\\") (\\\".+?\\\")");

	 Matcher InfoMatcher = InfoPattern.matcher(weblog_origin);
	 if(InfoMatcher.find()) {
			
	   String requestInfo = InfoMatcher.group(1).replace('\"',' ').trim();
	   String[] requestInfoArry = requestInfo.split(" ");
	   weblogbean.setMethod(requestInfoArry[0]);
	   weblogbean.setRequest_URL(requestInfoArry[1]);
	   weblogbean.setRequest_protocol(requestInfoArry[2]);
	   String status_code = InfoMatcher.group(2);
	   weblogbean.setRespond_code(status_code);
			
	   String respond_data = InfoMatcher.group(3);
	   weblogbean.setRespond_data(respond_data);
			
	   String request_come_from = InfoMatcher.group(4).replace('\"',' ').trim();
	   weblogbean.setRequst_come_from(request_come_from);
			
	   String browserInfo = InfoMatcher.group(5).replace('\"',' ').trim();
	   weblogbean.setBrowser(browserInfo);
	} else {
	   return "";
	}
		
        return weblogbean.toString();
   }
		
}
package com.guludada.javabean;

public class WebLogBean {
	
	String IP_addr;
	String time;
	String method;
	String request_URL;
	String request_protocol;
	String respond_code;
	String respond_data;
	String requst_come_from;
	String browser;
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getMethod() {
		return method;
	}
	public void setMethod(String method) {
		this.method = method;
	}
	public String getRequest_URL() {
		return request_URL;
	}
	public void setRequest_URL(String request_URL) {
		this.request_URL = request_URL;
	}
	public String getRequest_protocol() {
		return request_protocol;
	}
	public void setRequest_protocol(String request_protocol) {
		this.request_protocol = request_protocol;
	}
	public String getRespond_code() {
		return respond_code;
	}
	public void setRespond_code(String respond_code) {
		this.respond_code = respond_code;
	}
	public String getRespond_data() {
		return respond_data;
	}
	public void setRespond_data(String respond_data) {
		this.respond_data = respond_data;
	}
	public String getRequst_come_from() {
		return requst_come_from;
	}
	public void setRequst_come_from(String requst_come_from) {
		this.requst_come_from = requst_come_from;
	}
	public String getBrowser() {
		return browser;
	}
	public void setBrowser(String browser) {
		this.browser = browser;
	}
	@Override
	public String toString() {
		return IP_addr + " " + time + " " + method + " "
				+ request_URL + " " + request_protocol + " " + respond_code
				+ " " + respond_data + " " + requst_come_from + " " + browser;
	}
	
	
}

第一次日記清洗後的記錄如下圖:
 

步,根據訪問記錄生成相應的Session資訊記錄假設Session的過期時間是30分鐘

package com.guludada.clickstream;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.dataparser.SessionParser;
import com.guludada.dataparser.WebLogParser;
import com.guludada.javabean.WebLogSessionBean;

public class logSession {
	
	public static class sessionMapper extends Mapper<Object,Text,Text,Text> {
		
		private Text IPAddr = new Text();
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		WebLogParser webLogParser = new WebLogParser();
		
		public void map(Object key,Text value,Context context) {
						
			//將一行內容轉成string
			String line = value.toString();
			
			String[] weblogArry = line.split(" ");
				
			IPAddr.set(weblogArry[0]);
			content.set(line);
			try {
				context.write(IPAddr,content);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}			
		}
	}

	static class sessionReducer extends Reducer<Text, Text, Text, NullWritable>{
		
		private Text IPAddr = new Text();
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		WebLogParser webLogParser = new WebLogParser();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		SessionParser sessionParser = new SessionParser();
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			
			Date sessionStartTime = null;
			String sessionID = UUID.randomUUID().toString();
			
						
			//將IP地址所對應的使用者的所有瀏覽記錄按時間排序
			ArrayList<WebLogSessionBean> sessionBeanGroup  = new ArrayList<WebLogSessionBean>();
			for(Text browseHistory : values) {
				WebLogSessionBean sessionBean = sessionParser.loadBean(browseHistory.toString());
				sessionBeanGroup.add(sessionBean);
			}
			Collections.sort(sessionBeanGroup,new Comparator<WebLogSessionBean>() {

				public int compare(WebLogSessionBean sessionBean1, WebLogSessionBean sessionBean2) {					
					Date date1 = sessionBean1.getTimeWithDateFormat();
					Date date2 = sessionBean2.getTimeWithDateFormat();
					if(date1 == null && date2 == null) return 0;
					return date1.compareTo(date2);
				}
			});
			
			for(WebLogSessionBean sessionBean : sessionBeanGroup) {
				
				if(sessionStartTime == null) {
					//當天日誌中某使用者第一次訪問網站的時間
					sessionStartTime = timeTransform(sessionBean.getTime());
					content.set(sessionParser.parser(sessionBean, sessionID));
					try {
						context.write(content,v);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				} else {
					
					Date sessionEndTime = timeTransform(sessionBean.getTime());
					long sessionStayTime = timeDiffer(sessionStartTime,sessionEndTime);
					if(sessionStayTime > 30 * 60 * 1000) {						
						//將當前瀏覽記錄的時間設為下一個session的開始時間
						sessionStartTime = timeTransform(sessionBean.getTime());
						sessionID = UUID.randomUUID().toString();												
						continue;
					} 
					content.set(sessionParser.parser(sessionBean, sessionID));						
					try {
						context.write(content,v);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}				
			}
		}
		
		private Date timeTransform(String time) {
			
			Date standard_time = null;
			try {
				standard_time = sdf.parse(time);
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			return standard_time;
		}
		
		private long timeDiffer(Date start_time,Date end_time) {
			
			long diffTime = 0;
			diffTime = end_time.getTime() - start_time.getTime();
			
			return diffTime;
		}
		
	}
	
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
	
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
						
		Job job = Job.getInstance(conf);
				
		job.setJarByClass(logClean.class);
		
		//指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(sessionMapper.class);
		job.setReducerClass(sessionReducer.class);
				
		//指定mapper輸出資料的kv型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//指定最終輸出的資料的kv型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		
		//指定job的輸入原始檔案所在目錄
		FileInputFormat.setInputPaths(job, new Path("/clickstream/cleandata/"+dateStr+"/*"));
		//指定job的輸出結果所在目錄
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/sessiondata/"+dateStr+"/"));
		
		//將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	}
}
package com.guludada.dataparser;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import com.guludada.javabean.WebLogSessionBean;

public class SessionParser {
	
	SimpleDateFormat sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
	SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public String parser(WebLogSessionBean sessionBean,String sessionID) {
				
		sessionBean.setSession(sessionID);		
		return sessionBean.toString();
	}
	
	public WebLogSessionBean loadBean(String sessionContent) {
		
		WebLogSessionBean weblogSession = new WebLogSessionBean();
			
		String[] contents = sessionContent.split(" ");
		weblogSession.setTime(timeTransform(contents[1]));
		weblogSession.setIP_addr(contents[0]);
		weblogSession.setRequest_URL(contents[3]);
		weblogSession.setReferal(contents[7]);
			
		return weblogSession;
	}
	
	private String timeTransform(String time) {
				
		Date standard_time = null;
		try {
			standard_time = sdf_origin.parse(time);
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return sdf_final.format(standard_time);
	}
}
package com.guludada.javabean;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class WebLogSessionBean {
	
	String time;
	String IP_addr;
	String session;
	String request_URL;
	String referal;
	
	
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getSession() {
		return session;
	}
	public void setSession(String session) {
		this.session = session;
	}
	public String getRequest_URL() {
		return request_URL;
	}
	public void setRequest_URL(String request_URL) {
		this.request_URL = request_URL;
	}
	public String getReferal() {
		return referal;
	}
	public void setReferal(String referal) {
		this.referal = referal;
	}
	
	public Date getTimeWithDateFormat() {
		
		SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		if(this.time != null && this.time != "") {
			try {
				return sdf_final.parse(this.time);
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return null;
	}
	
	@Override
	public String toString() {
		return time + " " + IP_addr + " " + session + " "
				+ request_URL + " " + referal;
	}
	
	
	
}

第二次清理出來的Session資訊結構如下:
時間 IP SessionID 請求頁面URL Referal URL
2015-05-30 19:38:00 192.168.12.130 Session1 /blog/me www.baidu.com
2015-05-30 19:39:00 192.168.12.130 Session1 /blog/me/details www.mysite.com/blog/me
2015-05-30 19:38:00 192.168.12.40 Session2 /blog/me www.baidu.com



第三步,清洗第二步生成的Session資訊,生成PageViews資訊表
package com.guludada.clickstream;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.clickstream.logSession.sessionMapper;
import com.guludada.clickstream.logSession.sessionReducer;
import com.guludada.dataparser.PageViewsParser;
import com.guludada.dataparser.SessionParser;
import com.guludada.dataparser.WebLogParser;
import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;

public class PageViews {
	
	public static class pageMapper extends Mapper<Object,Text,Text,Text> {
			
			private Text word = new Text();
			
			public void map(Object key,Text value,Context context) {
								
				String line = value.toString();
				String[] webLogContents = line.split(" ");
				
				//根據session來分組
				word.set(webLogContents[2]);
					try {
						context.write(word,value);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}						
			}
		}

	public static class pageReducer extends Reducer<Text, Text, Text, NullWritable>{
	
		private Text session = new Text();
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		PageViewsParser pageViewsParser = new PageViewsParser();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		//上一條記錄的訪問資訊
		PageViewsBean lastStayPageBean = null;
		Date lastVisitTime = null;
	
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			
			//將session所對應的所有瀏覽記錄按時間排序
			ArrayList<PageViewsBean> pageViewsBeanGroup  = new ArrayList<PageViewsBean>();
			for(Text pageView : values) {
				PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString());
				pageViewsBeanGroup.add(pageViewsBean);
			}
			Collections.sort(pageViewsBeanGroup,new Comparator<PageViewsBean>() {

				public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) {					
					Date date1 = pageViewsBean1.getTimeWithDateFormat();
					Date date2 = pageViewsBean2.getTimeWithDateFormat();
					if(date1 == null && date2 == null) return 0;
					return date1.compareTo(date2);
				}
			});
			
			//計算每個頁面的停留時間
			int step = 0;
			for(PageViewsBean pageViewsBean : pageViewsBeanGroup) {
				
				Date curVisitTime = pageViewsBean.getTimeWithDateFormat();
				
				if(lastStayPageBean != null) {	
					//計算前後兩次訪問記錄相差的時間,單位是秒
					Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000);						
					//根據當前記錄的訪問資訊更新上一條訪問記錄中訪問的頁面的停留時間
					lastStayPageBean.setStayTime(timeDiff.toString());
				}
				
				//更新訪問記錄的步數
				step++;
				pageViewsBean.setStep(step+"");
				//更新上一條訪問記錄的停留時間後,將當前訪問記錄設定為上一條訪問資訊記錄
				lastStayPageBean = pageViewsBean;
				lastVisitTime = curVisitTime;	
				
				//輸出pageViews資訊
				content.set(pageViewsParser.parser(pageViewsBean));						
				try {
					context.write(content,v);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}			
	}	
}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
						
		Job job = Job.getInstance(conf);
				
		job.setJarByClass(PageViews.class);
		
		//指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(pageMapper.class);
		job.setReducerClass(pageReducer.class);
				
		//指定mapper輸出資料的kv型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//指定最終輸出的資料的kv型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		
		//指定job的輸入原始檔案所在目錄
		FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*"));
		//指定job的輸出結果所在目錄
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/"));
		
		//將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	}
}
package com.guludada.dataparser;

import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;

public class PageViewsParser {
	/**
	 * 根據logSession的輸出資料載入PageViewsBean
	 * 
	 * */
	public PageViewsBean loadBean(String sessionContent) {
		
		PageViewsBean pageViewsBean = new PageViewsBean();
			
		String[] contents = sessionContent.split(" ");
		pageViewsBean.setTime(contents[0] + " " + contents[1]);
		pageViewsBean.setIP_addr(contents[2]);
		pageViewsBean.setSession(contents[3]);
		pageViewsBean.setVisit_URL(contents[4]);
		pageViewsBean.setStayTime("0");
		pageViewsBean.setStep("0");
					
		return pageViewsBean;
	}
	
	public String parser(PageViewsBean pageBean) {
		
		return pageBean.toString();
	}

}
package com.guludada.javabean;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class PageViewsBean {
	
	String session;
	String IP_addr;
	String time;
	String visit_URL;
	String stayTime;
	String step;
	public String getSession() {
		return session;
	}
	public void setSession(String session) {
		this.session = session;
	}
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
	public String getVisit_URL() {
		return visit_URL;
	}
	public void setVisit_URL(String visit_URL) {
		this.visit_URL = visit_URL;
	}
	public String getStayTime() {
		return stayTime;
	}
	public void setStayTime(String stayTime) {
		this.stayTime = stayTime;
	}
	public String getStep() {
		return step;
	}
	public void setStep(String step) {
		this.step = step;
	}
	
	public Date getTimeWithDateFormat() {
		
		SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		if(this.time != null && this.time != "") {
			try {
				return sdf_final.parse(this.time);
			} catch (ParseException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return null;
	}
	
	@Override
	public String toString() {
		return session + " " + IP_addr + " " + time + " "
				+ visit_URL + " " + stayTime + " " + step;
	}
		
}
第三次日誌清洗產生的PageViews資料結構如下圖:
SessionID IP 訪問時間 訪問頁面 停留時間 第幾步
Session1 192.168.12.130 2016-05-30 15:17:30 /blog/me 30000 1
Session1 192.168.12.130 2016-05-30 15:18:00 /blog/me/admin 30000 2
Session1 192.168.12.130 2016-05-30 15:18:30 /home 30000 3
Session2 192.168.12.150 2016-05-30 15:16:30 /products 30000 1
Session2 192.168.12.150 2016-05-30 15:17:00 /products/details 30000 2



第四步,再次清洗Session日誌,並生成Visits資訊表
package com.guludada.clickstream;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.guludada.clickstream.PageViews.pageMapper;
import com.guludada.clickstream.PageViews.pageReducer;
import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.dataparser.PageViewsParser;
import com.guludada.dataparser.VisitsInfoParser;
import com.guludada.javabean.PageViewsBean;

public class VisitsInfo {
	
	public static class visitMapper extends Mapper<Object,Text,Text,Text> {
		
		private Text word = new Text();
		
		public void map(Object key,Text value,Context context) {
							
			String line = value.toString();
			String[] webLogContents = line.split(" ");
			
			//根據session來分組
			word.set(webLogContents[2]);
				try {
					context.write(word,value);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}						
		}
	}

	public static class visitReducer extends Reducer<Text, Text, Text, NullWritable>{
	
		private Text content = new Text();
		private NullWritable v = NullWritable.get();
		VisitsInfoParser visitsParser = new VisitsInfoParser();
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		PageViewsParser pageViewsParser = new PageViewsParser();
		Map<String,Integer> viewedPagesMap = new HashMap<String,Integer>();
		
		String entry_URL = "";
		String leave_URL = "";
		int total_visit_pages = 0;
	
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			
			//將session所對應的所有瀏覽記錄按時間排序
			ArrayList<String> browseInfoGroup  = new ArrayList<String>();
			for(Text browseInfo : values) {
				browseInfoGroup.add(browseInfo.toString());
			}
			Collections.sort(browseInfoGroup,new Comparator<String>() {
				
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
				public int compare(String browseInfo1, String browseInfo2) {
					String dateStr1 = browseInfo1.split(" ")[0] + " " + browseInfo1.split(" ")[1];
					String dateStr2 = browseInfo2.split(" ")[0] + " " + browseInfo2.split(" ")[1];
					Date date1;
					Date date2;
					try {
						date1 = sdf.parse(dateStr1);
						date2 = sdf.parse(dateStr2);					 
						if(date1 == null && date2 == null) return 0;
						return date1.compareTo(date2);
					} catch (ParseException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
						return 0;
					}
				}
			});
			
			//統計該session訪問的總頁面數,第一次進入的頁面,跳出的頁面			
			for(String browseInfo : browseInfoGroup) {
				
				String[] browseInfoStrArr = browseInfo.split(" ");				
				String curVisitURL = browseInfoStrArr[3];
				Integer curVisitURLInteger = viewedPagesMap.get(curVisitURL);
				if(curVisitURLInteger == null) {
					viewedPagesMap.put(curVisitURL, 1);
				}	
			}
			total_visit_pages = viewedPagesMap.size();
			String visitsInfo = visitsParser.parser(browseInfoGroup, total_visit_pages+"");	
			content.set(visitsInfo);
			try {
				context.write(content,v);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}	
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		
		conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
						
		Job job = Job.getInstance(conf);
						
		job.setJarByClass(VisitsInfo.class);
		
		//指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(visitMapper.class);
		job.setReducerClass(visitReducer.class);
				
		//指定mapper輸出資料的kv型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//指定最終輸出的資料的kv型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Date curDate = new Date();
		SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
		String dateStr = sdf.format(curDate);
		
		//指定job的輸入原始檔案所在目錄
		FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*"));
		//指定job的輸出結果所在目錄
		FileOutputFormat.setOutputPath(job, new Path("/clickstream/visitsinfo"+dateStr+"/"));
		
		//將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
		
	}
}

package com.guludada.dataparser;

import java.util.ArrayList;

import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.VisitsInfoBean;
import com.guludada.javabean.WebLogSessionBean;

public class VisitsInfoParser {
	
	public String parser(ArrayList<String> pageViewsGroup,String totalVisitNum) {
		
		VisitsInfoBean visitsBean = new VisitsInfoBean();
		String entryPage = pageViewsGroup.get(0).split(" ")[4];
		String leavePage = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[4];
		String startTime = pageViewsGroup.get(0).split(" ")[0] + " " + pageViewsGroup.get(0).split(" ")[1];
		String endTime = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[0] + 
					" " +pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[1];
		String session = pageViewsGroup.get(0).split(" ")[3];
		String IP = pageViewsGroup.get(0).split(" ")[2];
		String referal = pageViewsGroup.get(0).split(" ")[5];
		
		visitsBean.setSession(session);
		visitsBean.setStart_time(startTime);
		visitsBean.setEnd_time(endTime);
		visitsBean.setEntry_page(entryPage);
		visitsBean.setLeave_page(leavePage);
		visitsBean.setVisit_page_num(totalVisitNum);
		visitsBean.setIP_addr(IP);
		visitsBean.setReferal(referal);
		
		return visitsBean.toString();
	}
}

package com.guludada.javabean;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class VisitsInfoBean {
	
	String session;
	String start_time;
	String end_time;
	String entry_page;
	String leave_page;
	String visit_page_num;
	String IP_addr;
	String referal;
	
	public String getSession() {
		return session;
	}
	public void setSession(String session) {
		this.session = session;
	}
	public String getStart_time() {
		return start_time;
	}
	public void setStart_time(String start_time) {
		this.start_time = start_time;
	}
	public String getEnd_time() {
		return end_time;
	}
	public void setEnd_time(String end_time) {
		this.end_time = end_time;
	}
	public String getEntry_page() {
		return entry_page;
	}
	public void setEntry_page(String entry_page) {
		this.entry_page = entry_page;
	}
	public String getLeave_page() {
		return leave_page;
	}
	public void setLeave_page(String leave_page) {
		this.leave_page = leave_page;
	}
	public String getVisit_page_num() {
		return visit_page_num;
	}
	public void setVisit_page_num(String visit_page_num) {
		this.visit_page_num = visit_page_num;
	}
	public String getIP_addr() {
		return IP_addr;
	}
	public void setIP_addr(String iP_addr) {
		IP_addr = iP_addr;
	}
	public String getReferal() {
		return referal;
	}
	public void setReferal(String referal) {
		this.referal = referal;
	}
	
	@Override
	public String toString() {
		return session + " " + start_time + " " + end_time
				+ " " + entry_page + " " + leave_page + " " + visit_page_num
				+ " " + IP_addr + " " + referal;
	}
	
	
		
}

第四次清洗日誌產生的訪問記錄表結構如下圖:
SessionID 訪問時間 離開時間 第一次訪問頁面 最後一次訪問的頁面

訪問的頁面總數
IP

Referal
Session1 2016-05-30 15:17:00 2016-05-30 15:19:00 /blog/me /blog/others 5 192.168.12.130 www.baidu.com
Session2 2016-05-30 14:17:00 2016-05-30 15:19:38 /home /profile 10 192.168.12.140 www.178.com
Session3 2016-05-30 12:17:00 2016-05-30 15:40:00 /products /detail 6 192.168.12.150 www.78dm.net



      以上就是要進行日誌清洗的所有MapReduce程式,因為只是一個簡單的演示,方法並沒有做很好的抽象。