1. 程式人生 > >大資料之電話日誌分析callLog案例(二)

大資料之電話日誌分析callLog案例(二)

一、前端實現 -- 按照時間段查詢通話記錄
-----------------------------------------
    1.完善calllog的dao類calllog.class
    ----------------------------------------------
       
 package com.it18zhang.ssm.domain;

        /**
         * calllog的domain類 -- 標準javabean
         */
        public class Calllog {

            private String caller;
            private String callee;
            private String callTime;
            private String callDuration;
            //是否是主叫
            private boolean flag;

            public boolean isFlag() {
                return flag;
            }

            public void setFlag(boolean flag) {
                this.flag = flag;
            }

            public String getCaller() {
                return caller;
            }

            public void setCaller(String caller) {
                this.caller = caller;
            }

            public String getCallee() {
                return callee;
            }

            public void setCallee(String callee) {
                this.callee = callee;
            }

            public String getCallTime() {
                return callTime;
            }

            public void setCallTime(String callTime) {
                this.callTime = callTime;
            }

            public String getCallDuration() {
                return callDuration;
            }

            public void setCallDuration(String callDuration) {
                this.callDuration = callDuration;
            }
        }

    2.通話日誌時間查詢dao類CalllogRange.class
    -----------------------------------------------
       
 package com.it18zhang.ssm.domain;

        /**
         */
        public class CalllogRange {

            private String startPoint ;
            private String endPoint ;

            public String getStartPoint() {
                return startPoint;
            }

            public void setStartPoint(String startPoint) {
                this.startPoint = startPoint;
            }

            public String getEndPoint() {
                return endPoint;
            }

            public void setEndPoint(String endPoint) {
                this.endPoint = endPoint;
            }

            public String toString() {
                return startPoint + " - " + endPoint ;
            }
        }

    3.完善Service類 CalllogService.class
    ------------------------------------------------------
         
 package com.it18zhang.ssm.service;
          import com.it18zhang.ssm.domain.Calllog;
          import com.it18zhang.ssm.domain.CalllogRange;

          import java.util.List;

          /**
           * Calllog的服務類 -- 用於定製與伺服器互動的規則
           */
          public interface CalllogService {

              //查詢所有的calllog
              public List<Calllog> findAll();
              /**
               * 按照範圍查詢通話記錄
               */
              public List<Calllog> findCallogs(String call,List<CalllogRange> list);
          }

    4.完善CalllogServiceImpl類
    -----------------------------------------------
     
   package com.it18zhang.ssm.service.impl;

        import com.it18zhang.ssm.domain.Calllog;
        import com.it18zhang.ssm.domain.CalllogRange;
        import com.it18zhang.ssm.service.CalllogService;
        import com.it18zhang.ssm.util.CalllogUtil;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.hbase.HBaseConfiguration;
        import org.apache.hadoop.hbase.TableName;
        import org.apache.hadoop.hbase.client.*;
        import org.apache.hadoop.hbase.util.Bytes;
        import org.springframework.stereotype.Service;

        import java.io.IOException;
        import java.util.*;

        /**
         * CalllogService的實現類
         */
        @Service("calllogService")
        public class CalllogServiceImpl implements CalllogService {

            private Table table;
            public CalllogServiceImpl()
            {
                try {
                    //獲取配置檔案
                    Configuration conf = HBaseConfiguration.create();
                    //工廠類建立連線
                    Connection conn = ConnectionFactory.createConnection(conf);
                    //get table
                    TableName tbName = TableName.valueOf("call:calllogs");
                    table = conn.getTable(tbName);

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            /**
             * 查詢所有的calllog
             * 全表掃描
             * @return
             */
            public List<Calllog> findAll() {
                List<Calllog> list = new ArrayList<Calllog>();
                try {
                    //掃描
                    Scan scan = new Scan();
                    ResultScanner rs = table.getScanner(scan);
                    Iterator<Result> it = rs.iterator();
                    byte[] famliy = Bytes.toBytes("f1");
                    byte[] callerf = Bytes.toBytes("caller");
                    byte[] calleef = Bytes.toBytes("callee");
                    byte[] callTimef = Bytes.toBytes("callTime");
                    byte[] callDurationf = Bytes.toBytes("callDuration");
                    Calllog calllog = null;

                    while (it.hasNext()) {

                        Result next = it.next();
                        String caller = Bytes.toString(next.getValue(famliy, callerf));
                        String callee = Bytes.toString(next.getValue(famliy, calleef));
                        String callTime = Bytes.toString(next.getValue(famliy, callTimef));
                        String callDuration = Bytes.toString(next.getValue(famliy, callDurationf));

                        calllog = new Calllog();
                        calllog.setCaller(caller);
                        calllog.setCallee(callee);
                        calllog.setCallTime(callTime);
                        calllog.setCallDuration(callDuration);

                        list.add(calllog);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return list;
            }


            /**
             * 按照範圍查詢通話記錄
             */
            public List<Calllog> findCallogs(String call , List<CalllogRange> ranges){
                List<Calllog> logs = new ArrayList<Calllog>();
                try {
                    for(CalllogRange range : ranges){
                        Scan scan = new Scan();
                        //設定掃描起始行
                        scan.setStartRow(Bytes.toBytes(CalllogUtil.getStartRowkey(call, range.getStartPoint(),100)));
                        //設定掃描結束行
                        scan.setStopRow(Bytes.toBytes(CalllogUtil.getStopRowkey(call, range.getStartPoint(), range.getEndPoint(),100)));

                        ResultScanner rs = table.getScanner(scan);
                        Iterator<Result> it = rs.iterator();
                        byte[] f = Bytes.toBytes("f1");

                        byte[] caller = Bytes.toBytes("caller");
                        byte[] callee = Bytes.toBytes("callee");
                        byte[] callTime = Bytes.toBytes("callTime");
                        byte[] callDuration = Bytes.toBytes("callDuration");
                        Calllog log = null;
                        while (it.hasNext()) {
                            log = new Calllog();
                            Result r = it.next();
                            //rowkey
                            String rowkey = Bytes.toString(r.getRow());
                            String flag = rowkey.split(",")[3] ;
                            log.setFlag(flag.equals("0")?true:false);
                            //caller
                            log.setCaller(Bytes.toString(r.getValue(f, caller)));
                            //callee
                            log.setCallee(Bytes.toString(r.getValue(f, callee)));
                            //callTime
                            log.setCallTime(Bytes.toString(r.getValue(f, callTime)));
                            //callDuration
                            log.setCallDuration(Bytes.toString(r.getValue(f, callDuration)));
                            logs.add(log);
                        }
                    }
                    return logs;
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
        }


    5.編寫Callog工具類
    ------------------------------------------
       
 package com.it18zhang.ssm.util;

        import com.it18zhang.ssm.domain.CalllogRange;

        import java.text.DecimalFormat;
        import java.text.SimpleDateFormat;
        import java.util.ArrayList;
        import java.util.Calendar;
        import java.util.List;

        /**
         * calllog工具類
         */
        public class CalllogUtil {


            private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
            private static SimpleDateFormat sdfFriend = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

            //格式化
            private static DecimalFormat df = new DecimalFormat();



            /**
             * 獲取hash值,預設分割槽數100
             */
            public static String getHashcode(String caller, String callTime, int partitions) {
                int len = caller.length();
                //取出後四位電話號碼
                String last4Code = caller.substring(len - 4);
                //取出時間單位,年份和月份.
                String mon = callTime.substring(0, 6);
                //
                int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions;
                return df.format(hashcode);
            }

            /**
             * 起始時間
             */
            public static String getStartRowkey(String caller, String startTime, int partitions){
                String hashcode = getHashcode(caller, startTime,partitions);
                return hashcode + "," + caller + "," + startTime ;
            }

            /**
             * 結束時間
             */
            public static String getStopRowkey(String caller, String startTime,String endTime, int partitions){
                String hashcode = getHashcode(caller, startTime,partitions);
                return hashcode + "," + caller + "," + endTime ;
            }

            /**
             * 計算查詢時間範圍
             */
            public static List<CalllogRange> getCallLogRanges(String startStr ,String endStr){
                try{
                    SimpleDateFormat sdfYMD = new SimpleDateFormat("yyyyMMdd");
                    SimpleDateFormat sdfYM = new SimpleDateFormat("yyyyMM");
                    DecimalFormat df00 = new DecimalFormat("00");

                    //
                    List<CalllogRange> list = new ArrayList<CalllogRange>();
                    //字串時間
                    String startPrefix = startStr.substring(0, 6);

                    String endPrefix = endStr.substring(0, 6);
                    int endDay = Integer.parseInt(endStr.substring(6, 8));
                    //結束點
                    String endPoint = endPrefix + df00.format(endDay + 1);

                    //日曆物件
                    Calendar c = Calendar.getInstance();

                    //同年月
                    if (startPrefix.equals(endPrefix)) {
                        CalllogRange range = new CalllogRange();
                        range.setStartPoint(startStr);          //設定起始點

                        range.setEndPoint(endPoint);            //設定結束點
                        list.add(range);
                    } else {
                        //1.起始月
                        CalllogRange range = new CalllogRange();
                        range.setStartPoint(startStr);

                        //設定日曆的時間物件
                        c.setTime(sdfYMD.parse(startStr));
                        c.add(Calendar.MONTH, 1);
                        range.setEndPoint(sdfYM.format(c.getTime()));
                        list.add(range);

                        //是否是最後一月
                        while (true) {
                            //到了結束月份
                            if (endStr.startsWith(sdfYM.format(c.getTime()))) {
                                range = new CalllogRange();
                                range.setStartPoint(sdfYM.format(c.getTime()));
                                range.setEndPoint(endPoint);
                                list.add(range);
                                break;
                            } else {
                                range = new CalllogRange();
                                //起始時間
                                range.setStartPoint(sdfYM.format(c.getTime()));

                                //增加月份
                                c.add(Calendar.MONTH, 1);
                                range.setEndPoint(sdfYM.format(c.getTime()));
                                list.add(range);
                            }
                        }
                    }
                    return list ;
                }
                catch(Exception e){
                    e.printStackTrace();
                }
                return null ;
            }

            /**
             * 對時間進行格式化
             */
            public static String formatDate(String timeStr){
                try {
                    return sdfFriend.format(sdf.parse(timeStr));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null ;
            }


        }

    6.編寫控制器CalllogController.class
    -----------------------------------------
     
   package com.it18zhang.ssm.web.controller;

        import com.it18zhang.ssm.domain.Calllog;
        import com.it18zhang.ssm.domain.CalllogRange;
        import com.it18zhang.ssm.service.CalllogService;
        import com.it18zhang.ssm.service.impl.CalllogServiceImpl;
        import com.it18zhang.ssm.util.CalllogUtil;
        import org.springframework.stereotype.Controller;
        import org.springframework.ui.Model;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RequestMethod;
        import org.springframework.web.bind.annotation.RequestParam;

        import javax.annotation.Resource;
        import java.util.Calendar;
        import java.util.List;

        @Controller
        public class CalllogController {

            @Resource(name="calllogService")
            private CalllogService cs;

            /**
             * 傳送引數
             * @param model
             * @return
             */
            @RequestMapping("calllog/findAll")
            public String findAll(Model model)
            {
                List<Calllog> list = cs.findAll();
                model.addAttribute("calllogs", list);
                return "calllog/calllogList";
            }

            /**
             * 跳轉到查詢介面
             * @return
             */
            @RequestMapping("calllog/toFindCalllogPage")
            public String toFindCalllog()
            {
                return "calllog/findCalllog";
            }

            /**
             * 接受引數
             * @param m
             * @return
             */
            @RequestMapping(value = "calllog/findCalllog", method = RequestMethod.POST)
            public String findCalllog(Model m, @RequestParam("caller") String caller, @RequestParam("startTime") String startTime, @RequestParam("endTime") String endTime)
            {
                List<CalllogRange> list = CalllogUtil.getCallLogRanges(startTime, endTime);
                List<Calllog> logs = cs.findCallogs(caller,list);
                m.addAttribute("calllogs", logs);
                return "callLog/calllogList" ;
            }
        }


二、編寫協處理器模組CalllogCoprossorModel,實現主叫發生的同時即插入被叫記錄
--------------------------------------------------------
    1.說明
        因為HbaseCustomer消費者,得到的資料都是主叫資料,沒有被叫資料
        所以,需要做一個協處理器,來處理當主叫發生的時候,同時寫入被叫的記錄

    2.建立協處理器模組CalllogCoprpssorModel,並新增maven依賴
        
<?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>

            <groupId>calllog.kafka</groupId>
            <artifactId>CalllogCustomerModel</artifactId>
            <version>1.0-SNAPSHOT</version>

            <dependencies>

                <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka_2.11</artifactId>
                    <version>0.10.0.1</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.hbase</groupId>
                    <artifactId>hbase-client</artifactId>
                    <version>1.2.4</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.hbase</groupId>
                    <artifactId>hbase-server</artifactId>
                    <version>1.2.4</version>
                </dependency>

            </dependencies>

        </project>

    3.新建協處理器區域觀察者類com.calllog.coprossor.CalllogRegion
    -------------------------------------
       
 package com.calllog.coprossor;
        import org.apache.hadoop.hbase.Cell;
        import org.apache.hadoop.hbase.CellUtil;
        import org.apache.hadoop.hbase.TableName;
        import org.apache.hadoop.hbase.client.*;
        import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
        import org.apache.hadoop.hbase.coprocessor.ObserverContext;
        import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
        import org.apache.hadoop.hbase.regionserver.InternalScanner;
        import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
        import org.apache.hadoop.hbase.util.Bytes;

        import java.io.IOException;
        import java.util.ArrayList;
        import java.util.List;

        /**
         * 協處理器
         */
        public class CalllogRegion<postScannerNext> extends BaseRegionObserver {

            //被叫引用id
            private static final String REF_ROW_ID = "refrowid" ;
            //通話記錄表名
            private static final String CALL_LOG_TABLE_NAME = "call:calllogs" ;

            /**
             * 每次put資料之後呼叫
             */
            @Override
            public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
                super.postPut(e, put, edit, durability);

                TableName tName = TableName.valueOf("call:calllogs");

                TableName tName1 = e.getEnvironment().getRegion().getRegionInfo().getTable();

                if (tName.equals(tName1)) {
                    //得到rowkey
                    String rowKey = Bytes.toString(put.getRow());

                    String[] strs = rowKey.split(",");
                    //如果是被叫,直接返回
                    if (strs[3].equals("1")) {
                        return;
                    }
                    //66,15733218888,20181002071335,0,18332561111,063
                    //取出相應的值
                    String caller = strs[1];
                    String time = strs[2];
                    String callee = strs[4];
                    String duration = strs[5];
                    //計算區域號
                    String hash = CalllogUtil.getHashcode(callee, time, 100);
                    String newRowKey = hash + "," + callee + "," + time + "," + "1" + "," + caller + "," + duration;

                    //開始put資料
                    Put p = new Put(Bytes.toBytes(newRowKey));
                    p.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("refrowid"), Bytes.toBytes(rowKey));

                    Table tb = e.getEnvironment().getTable(tName);
                    tb.put(p);
                    System.out.println("put over");
                }
            }

            /**
             * get之後呼叫 -- 實現被叫查詢時,直接返回主叫的記錄
             * 將之前get的結果替換
             */
            @Override
            public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
                //獲得表名
                String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();

                //判斷表名是否是ns1:calllogs
                if(!tableName.equals("call:calllogs")){
                    super.preGetOp(e, get, results);
                }
                else{
                    //得到rowkey
                    String rowkey = Bytes.toString(get.getRow());
                    //
                    String[] arr = rowkey.split(",");
                    //主叫
                    if(arr[3].equals("0")){
                        super.postGetOp(e, get, results);
                    }
                    //被叫
                    else{
                        //得到主叫方的rowkey
                        String refrowid = Bytes.toString(CellUtil.cloneValue(results.get(0)));
                        //
                        Table tt = e.getEnvironment().getTable(TableName.valueOf("call:calllogs"));
                        Get g = new Get(Bytes.toBytes(refrowid));
                        Result r = tt.get(g);
                        List<Cell> newList = r.listCells();
                        results.clear();
                        results.addAll(newList);
                    }
                }
            }


            /**
             * 掃描之後呼叫
             */
            @Override
            public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {

                boolean b = super.postScannerNext(e, s, results, limit, hasMore);

                //新集合
                List<Result> newList = new ArrayList<Result>();

                //獲得表名
                String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();

                //判斷表名是否是ns1:calllogs
                if (tableName.equals(CALL_LOG_TABLE_NAME)) {
                    Table tt = e.getEnvironment().getTable(TableName.valueOf(CALL_LOG_TABLE_NAME));
                    for(Result r : results){
                        //rowkey
                        String rowkey = Bytes.toString(r.getRow());
                        String flag = rowkey.split(",")[3] ;
                        //主叫
                        if(flag.equals("0")){
                            newList.add(r) ;
                        }
                        //被叫
                        else{
                            //取出主叫號碼
                            byte[] refrowkey = r.getValue(Bytes.toBytes("f2"),Bytes.toBytes(REF_ROW_ID)) ;
                            Get newGet = new Get(refrowkey);
                            newList.add(tt.get(newGet));
                        }
                    }
                    results.clear();
                    results.addAll(newList);
                }
                return b ;
            }
        }



    4.新建工具類CoprossorUtil
    ----------------------------------------------
      
  package com.calllog.coprossor;

        import java.text.DecimalFormat;
        import java.text.SimpleDateFormat;
        import java.util.ArrayList;
        import java.util.Calendar;
        import java.util.List;

        /**
         * Created by Administrator on 2017/4/13.
         */
        public class CalllogUtil {

            private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
            private static SimpleDateFormat sdfFriend = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

            //格式化
            private static DecimalFormat df = new DecimalFormat();

            /**
             * 獲取hash值,預設分割槽數100
             */
            public static String getHashcode(String caller, String callTime,int partitions) {
                int len = caller.length();
                //取出後四位電話號碼
                String last4Code = caller.substring(len - 4);
                //取出時間單位,年份和月份.
                String mon = callTime.substring(0, 6);
                //
                int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions;
                return df.format(hashcode);
            }

            /**
             * 起始時間
             */
            public static String getStartRowkey(String caller, String startTime, int partitions){
                String hashcode = getHashcode(caller, startTime,partitions);
                return hashcode + "," + caller + "," + startTime ;
            }

            /**
             * 結束時間
             */
            public static String getStopRowkey(String caller, String startTime,String endTime, int partitions){
                String hashcode = getHashcode(caller, startTime,partitions);
                return hashcode + "," + caller + "," + endTime ;
            }

            /**
             * 對時間進行格式化
             */
            public static String formatDate(String timeStr){
                try {
                    return sdfFriend.format(sdf.parse(timeStr));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null ;
            }
        }


    5.打包部署
      a.註冊協處理器,並分發到所有hbase節點
          [hbase-site.xml]
            <property>
              <name>hbase.coprocessor.region.classes</name>
              <value>com.calllog.coprossor.CalllogRegion</value>
            </property>

      b.將打好的jar包分發到所有節點的/hbase/lib目錄下

      c.重啟hbase叢集

      d.進入hbase shell,重建表"call:calllogs" "f1" "f2"
        $hbase> create 'call:calllogs','f1','f2';

三、生成日誌,收集日誌

    1.開啟kafka叢集
        [s200 s300 s400]
        $> /soft/kafka/bin/kafka-server-start.sh -daemon /soft/kafka/config/server.properties

    2.在s100和s200上啟動flume,開始收集日誌
        $s100> flume-ng agent -f /soft/flume/conf/calllog.conf -n a1 &
        $s200> flume-ng agent -f /soft/flume/conf/calllog.conf -n a1 &

    3.開啟kafka的消費者--hbase[進入share/calllog目錄下,找到CalllogCustomerModel.jar包]
        $> java -cp CalllogCustomerModel.jar calllog.kafka.hbase.customer.HbaseCustomer

    4.開啟日誌生成工具
        $calllog> ./calllog.sh

    5.執行ssm模組,檢視webapp介面顯示