1. 程式人生 > >題目:使用UDF函式統計出flow.dat日誌檔案當中每個網站的瀏覽次數

題目:使用UDF函式統計出flow.dat日誌檔案當中每個網站的瀏覽次數

 

 

一:編寫MapReduce程式清洗資料

  我們需要的是統計日誌檔案中每個網站的瀏覽次數,為了方便起見,我們只取網站這一列資料。取出網站資料的這一操作就在map中進行,在reduce中無需對資料做處理。

原始碼:

MyMapper類

package com.WebsiteCount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;




public class MyMapper extends Mapper<LongWritable,Text,LongWritable,Text> {

	@Override
	protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
		String data = value.toString();
        String[] splitedData = data.split("\t");
        Text outValue=new Text(splitedData[12]);
        context.write(key,outValue);
        System.out.println("Mapper輸出<"+key.toString()+","+outValue.toString()+">");
	}

}

MyReduce類

package com.WebsiteCount;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.StringUtils;



public class MyReduce extends Reducer<LongWritable,Text,Text,NullWritable>{
	private MultipleOutputs<Text,Text> mos;  
	
	@Override
	 protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
			for(Text value : values) {
				System.out.println("reduce輸入鍵值對<"+key.toString()+","+value.toString()+">");
				context.write(value, NullWritable.get());
				System.out.println("reduce輸出鍵值對<"+value.toString()+",  ");
			}
	    }

	
}

驅動類  Website.java

package com.WebsiteCount;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Website{

	static final String INPUT_PATH = "hdfs://192.xxx.xx.xxx:9000/webcount/in";
	static final String OUT_PATH = "hdfs://192.xxx.xx.xxx:9000/webcount/out";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}
		
		final Job job = new Job(conf, Website.class.getSimpleName());
//        job.setJarByClass(Website.class);
		//1.1指定讀取的檔案位於哪裡
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定義的map類
		job.setMapperClass(MyMapper.class);
		//map輸出的<k,v>型別。
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
	
		//2.2 指定自定義reduce類
		job.setReducerClass(MyReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
//		LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
		//2.3 指定寫出到哪裡
		FileOutputFormat.setOutputPath(job, outPath);
		
		//把job提交給JobTracker執行
		job.waitForCompletion(true);

   }
}

接下來,進入linux下面的/usr/local目錄下:

在hdfs中建立目錄:

hdfs dfs -mkdir -p /webcount/in

將原始檔案flow.dat日誌檔案上傳到hdfs 中的webcount/in目錄下:

在eclipse中執行上述程式,部分結果如下:

在hdfs上面檢視是否成功:

hdfs dfs -ls  /webcount/out

在linux上面檢視返回否成功:

hdfs dfs -cat  /webcount/out/part-r-00000

部分結果如下:

二:使用hive 中UDF統計出每個網站在日誌檔案中的瀏覽次數

進入hive,建表:

create table webcount(web,string)

    > ROW FORMAT DELIMITED

    > FIELDS TERMINATED BY ','

    > STROED AS TEXTFILE;

向建立的表中匯入資料:

load data inpath '/webcount/out/part-r-00000' into table webcount;

查看錶:

編寫UDF函式階段:

思路:

先自定義一個UDAF,它是多輸入一條輸出的聚合,所以結果拼成字串輸出:public class Top4GroupBy extends UDAF

首先是定義一個物件用來儲存資料: public static class State

注意,在累加資料時需要判斷map的key中是否存在該字串,如果存在累加,不存在放入map中(重點)。

還需要自定義一個UDTF,支援一個輸入多個輸出。安裝分隔符將字串切分,將字串轉化為多行的列表輸出:public class ExplodeMap extends GenericUDTF

最後:

這兩個函式分別以top_group和explode_map為函式名加入到hive函式庫中,來自網路的應用例子如下(獲取前100個landingrefer的top url 100):

hive -e "select t.landingrefer, mytable.col1, mytable.col2,mytable.col3 from (select landingrefer, top_group(url,100) pro, count(sid) s from pvlog  where dt=20120719 and depth=1 group by landingrefer order by s desc limit 100) t lateral view explode_map(t.pro) mytable as col1, col2, col3;"> test

編寫UTAF  GroupBy.java

package com.hive;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class GroupBy extends UDAF{

    //定義一個物件用於儲存資料
    public static class State {
        private Map<Text, IntWritable> counts;
    }

    /**
     * 累加資料,判斷map的key中是否存在該字串,如果存在累加,不存在放入map中
     * @param s
     * @param o
     * @param i
     */
    private static void increment(State s, Text o, int i) {
        if (s.counts == null) {
            s.counts = new HashMap<Text, IntWritable>();
        }
        IntWritable count = s.counts.get(o);
        if (count == null) {
            Text key = new Text();
            key.set(o);
            s.counts.put(key, new IntWritable(i));
        } else {
            count.set(count.get() + i);
        }
    }
    public static class GroupByEvaluator implements UDAFEvaluator {
        private final State state;

        public GroupByEvaluator() {
            state = new State();
        }

        public void init() {
            if (state.counts != null) {
                state.counts.clear();
            }
        }

        public boolean iterate(Text value) {
            if (value == null) {
                return false;
            } else {
                increment(state, value, 1);
            }
            return true;
        }

        public State terminatePartial() {
            return state;
        }

        public boolean merge(State other) {
            if (state == null || other == null) {
                return false;
            }
            for (Map.Entry<Text, IntWritable> e : other.counts.entrySet()) {
                increment(state, e.getKey(), e.getValue().get());
            }
            return true;
        }

        public Text terminate() {
            if (state == null || state.counts.size() == 0) {
                return null;
            }
            Map<Text, IntWritable> it = sortByValue(state.counts, true);
            StringBuffer str = new StringBuffer();
            int i = 0;
            for (Map.Entry<Text, IntWritable> e : it.entrySet()) {
                ++i;
                str.append(e.getKey().toString()).append("[email protected]").append(e.getValue().get()).append("$*");
            }
            return new Text(str.toString());
        }

        /*
         * 實現一個map按值的排序演算法
         */
        @SuppressWarnings("unchecked")
        public static Map sortByValue(Map map, final boolean reverse) {
            List list = new LinkedList(map.entrySet());
            Collections.sort(list, new Comparator() {
                public int compare(Object o1, Object o2) {
                    if (reverse) {
                        return -((Comparable) ((Map.Entry) o1).getValue()).compareTo(((Map.Entry) o2).getValue());
                    }
                    return ((Comparable) ((Map.Entry) o1).getValue()).compareTo(((Map.Entry) o2).getValue());
                }
            });

            Map result = new LinkedHashMap();
            for (Iterator it = list.iterator(); it.hasNext();) {
                Map.Entry entry = (Map.Entry) it.next();
                result.put(entry.getKey(), entry.getValue());  
            }
            return result;
        }
    }
}

編寫UDTF  SplitResult.jav

package com.hive;

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class SplitResult extends GenericUDTF {

    @Override
    public void close() throws HiveException {
    }
 // 該方法指定輸入輸出引數:輸入的Object Inspectors和輸出的Struct。
 //返回UDTF的返回行的資訊(返回個數,型別)
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
        }//getCategory()可以獲得分類所有資訊,返回與查詢引數相匹配的類別物件陣列。
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("col1");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("col2");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

 // 該方法對傳入的引數進行處理,處理輸入記錄,然後通過forward()方法返回輸出結果。 
    @Override
    public void process(Object[] args) throws HiveException {
        String input = args[0].toString();
        String[] test = input.split("\\$\\*");
        for (int i = 0; i < test.length; i++) {
            try {
                String[] result  = new String[2];
                String[] sp= test[i].split("\\$\\@");
                result[0] =sp[0];
                result[1] =sp[1];
                //呼叫父類的forward方法進行資料的寫出
                forward(result);
            } catch (Exception e) {
                continue;
            }
        }
    }
}

將UDF新增到hive中:

將編寫的UDF打成jar包。

右擊專案名,Export

設定jar包輸出路徑及jar包名。

打好的jar如下:

將jar包傳到linux上

通過winSCP工具將打好的jar包上傳到linux的/usr/local目錄下:(下圖為winscp工具截圖)

將打好的jar包新增到hive中:

在hive-site.xml檔案中新增以下內容:

在hive中註冊函式:

create function group_by as 'com.yjw.hive.GroupBy';

create function splitrs as 'com.yjw.hive.SplitResult';

使用註冊的函式查詢

select webtimes.web,webtimes.times from (select group_by(web)pro from webcount) t lateral view splitrs(t.pro) webtimes as web,times;

結果部分如下:

實驗結束。