大資料之電話日誌分析callLog案例(二)
阿新 • • 發佈:2018-11-09
一、前端實現 -- 按照時間段查詢通話記錄
-----------------------------------------
1.完善calllog的dao類calllog.class
----------------------------------------------
package com.it18zhang.ssm.domain; /** * calllog的domain類 -- 標準javabean */ public class Calllog { private String caller; private String callee; private String callTime; private String callDuration; //是否是主叫 private boolean flag; public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } public String getCaller() { return caller; } public void setCaller(String caller) { this.caller = caller; } public String getCallee() { return callee; } public void setCallee(String callee) { this.callee = callee; } public String getCallTime() { return callTime; } public void setCallTime(String callTime) { this.callTime = callTime; } public String getCallDuration() { return callDuration; } public void setCallDuration(String callDuration) { this.callDuration = callDuration; } }
2.通話日誌時間查詢dao類CalllogRange.class
-----------------------------------------------
package com.it18zhang.ssm.domain; /** */ public class CalllogRange { private String startPoint ; private String endPoint ; public String getStartPoint() { return startPoint; } public void setStartPoint(String startPoint) { this.startPoint = startPoint; } public String getEndPoint() { return endPoint; } public void setEndPoint(String endPoint) { this.endPoint = endPoint; } public String toString() { return startPoint + " - " + endPoint ; } }
3.完善Service類 CalllogService.class
------------------------------------------------------
package com.it18zhang.ssm.service; import com.it18zhang.ssm.domain.Calllog; import com.it18zhang.ssm.domain.CalllogRange; import java.util.List; /** * Calllog的服務類 -- 用於定製與伺服器互動的規則 */ public interface CalllogService { //查詢所有的calllog public List<Calllog> findAll(); /** * 按照範圍查詢通話記錄 */ public List<Calllog> findCallogs(String call,List<CalllogRange> list); }
4.完善CalllogServiceImpl類
-----------------------------------------------
package com.it18zhang.ssm.service.impl;
import com.it18zhang.ssm.domain.Calllog;
import com.it18zhang.ssm.domain.CalllogRange;
import com.it18zhang.ssm.service.CalllogService;
import com.it18zhang.ssm.util.CalllogUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.*;
/**
* CalllogService的實現類
*/
@Service("calllogService")
public class CalllogServiceImpl implements CalllogService {
private Table table;
public CalllogServiceImpl()
{
try {
//獲取配置檔案
Configuration conf = HBaseConfiguration.create();
//工廠類建立連線
Connection conn = ConnectionFactory.createConnection(conf);
//get table
TableName tbName = TableName.valueOf("call:calllogs");
table = conn.getTable(tbName);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 查詢所有的calllog
* 全表掃描
* @return
*/
public List<Calllog> findAll() {
List<Calllog> list = new ArrayList<Calllog>();
try {
//掃描
Scan scan = new Scan();
ResultScanner rs = table.getScanner(scan);
Iterator<Result> it = rs.iterator();
byte[] famliy = Bytes.toBytes("f1");
byte[] callerf = Bytes.toBytes("caller");
byte[] calleef = Bytes.toBytes("callee");
byte[] callTimef = Bytes.toBytes("callTime");
byte[] callDurationf = Bytes.toBytes("callDuration");
Calllog calllog = null;
while (it.hasNext()) {
Result next = it.next();
String caller = Bytes.toString(next.getValue(famliy, callerf));
String callee = Bytes.toString(next.getValue(famliy, calleef));
String callTime = Bytes.toString(next.getValue(famliy, callTimef));
String callDuration = Bytes.toString(next.getValue(famliy, callDurationf));
calllog = new Calllog();
calllog.setCaller(caller);
calllog.setCallee(callee);
calllog.setCallTime(callTime);
calllog.setCallDuration(callDuration);
list.add(calllog);
}
} catch (Exception e) {
e.printStackTrace();
}
return list;
}
/**
* 按照範圍查詢通話記錄
*/
public List<Calllog> findCallogs(String call , List<CalllogRange> ranges){
List<Calllog> logs = new ArrayList<Calllog>();
try {
for(CalllogRange range : ranges){
Scan scan = new Scan();
//設定掃描起始行
scan.setStartRow(Bytes.toBytes(CalllogUtil.getStartRowkey(call, range.getStartPoint(),100)));
//設定掃描結束行
scan.setStopRow(Bytes.toBytes(CalllogUtil.getStopRowkey(call, range.getStartPoint(), range.getEndPoint(),100)));
ResultScanner rs = table.getScanner(scan);
Iterator<Result> it = rs.iterator();
byte[] f = Bytes.toBytes("f1");
byte[] caller = Bytes.toBytes("caller");
byte[] callee = Bytes.toBytes("callee");
byte[] callTime = Bytes.toBytes("callTime");
byte[] callDuration = Bytes.toBytes("callDuration");
Calllog log = null;
while (it.hasNext()) {
log = new Calllog();
Result r = it.next();
//rowkey
String rowkey = Bytes.toString(r.getRow());
String flag = rowkey.split(",")[3] ;
log.setFlag(flag.equals("0")?true:false);
//caller
log.setCaller(Bytes.toString(r.getValue(f, caller)));
//callee
log.setCallee(Bytes.toString(r.getValue(f, callee)));
//callTime
log.setCallTime(Bytes.toString(r.getValue(f, callTime)));
//callDuration
log.setCallDuration(Bytes.toString(r.getValue(f, callDuration)));
logs.add(log);
}
}
return logs;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
5.編寫Callog工具類
------------------------------------------
package com.it18zhang.ssm.util;
import com.it18zhang.ssm.domain.CalllogRange;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
/**
* calllog工具類
*/
public class CalllogUtil {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
private static SimpleDateFormat sdfFriend = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
//格式化
private static DecimalFormat df = new DecimalFormat();
/**
* 獲取hash值,預設分割槽數100
*/
public static String getHashcode(String caller, String callTime, int partitions) {
int len = caller.length();
//取出後四位電話號碼
String last4Code = caller.substring(len - 4);
//取出時間單位,年份和月份.
String mon = callTime.substring(0, 6);
//
int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions;
return df.format(hashcode);
}
/**
* 起始時間
*/
public static String getStartRowkey(String caller, String startTime, int partitions){
String hashcode = getHashcode(caller, startTime,partitions);
return hashcode + "," + caller + "," + startTime ;
}
/**
* 結束時間
*/
public static String getStopRowkey(String caller, String startTime,String endTime, int partitions){
String hashcode = getHashcode(caller, startTime,partitions);
return hashcode + "," + caller + "," + endTime ;
}
/**
* 計算查詢時間範圍
*/
public static List<CalllogRange> getCallLogRanges(String startStr ,String endStr){
try{
SimpleDateFormat sdfYMD = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat sdfYM = new SimpleDateFormat("yyyyMM");
DecimalFormat df00 = new DecimalFormat("00");
//
List<CalllogRange> list = new ArrayList<CalllogRange>();
//字串時間
String startPrefix = startStr.substring(0, 6);
String endPrefix = endStr.substring(0, 6);
int endDay = Integer.parseInt(endStr.substring(6, 8));
//結束點
String endPoint = endPrefix + df00.format(endDay + 1);
//日曆物件
Calendar c = Calendar.getInstance();
//同年月
if (startPrefix.equals(endPrefix)) {
CalllogRange range = new CalllogRange();
range.setStartPoint(startStr); //設定起始點
range.setEndPoint(endPoint); //設定結束點
list.add(range);
} else {
//1.起始月
CalllogRange range = new CalllogRange();
range.setStartPoint(startStr);
//設定日曆的時間物件
c.setTime(sdfYMD.parse(startStr));
c.add(Calendar.MONTH, 1);
range.setEndPoint(sdfYM.format(c.getTime()));
list.add(range);
//是否是最後一月
while (true) {
//到了結束月份
if (endStr.startsWith(sdfYM.format(c.getTime()))) {
range = new CalllogRange();
range.setStartPoint(sdfYM.format(c.getTime()));
range.setEndPoint(endPoint);
list.add(range);
break;
} else {
range = new CalllogRange();
//起始時間
range.setStartPoint(sdfYM.format(c.getTime()));
//增加月份
c.add(Calendar.MONTH, 1);
range.setEndPoint(sdfYM.format(c.getTime()));
list.add(range);
}
}
}
return list ;
}
catch(Exception e){
e.printStackTrace();
}
return null ;
}
/**
* 對時間進行格式化
*/
public static String formatDate(String timeStr){
try {
return sdfFriend.format(sdf.parse(timeStr));
} catch (Exception e) {
e.printStackTrace();
}
return null ;
}
}
6.編寫控制器CalllogController.class
-----------------------------------------
package com.it18zhang.ssm.web.controller;
import com.it18zhang.ssm.domain.Calllog;
import com.it18zhang.ssm.domain.CalllogRange;
import com.it18zhang.ssm.service.CalllogService;
import com.it18zhang.ssm.service.impl.CalllogServiceImpl;
import com.it18zhang.ssm.util.CalllogUtil;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.Resource;
import java.util.Calendar;
import java.util.List;
@Controller
public class CalllogController {
@Resource(name="calllogService")
private CalllogService cs;
/**
* 傳送引數
* @param model
* @return
*/
@RequestMapping("calllog/findAll")
public String findAll(Model model)
{
List<Calllog> list = cs.findAll();
model.addAttribute("calllogs", list);
return "calllog/calllogList";
}
/**
* 跳轉到查詢介面
* @return
*/
@RequestMapping("calllog/toFindCalllogPage")
public String toFindCalllog()
{
return "calllog/findCalllog";
}
/**
* 接受引數
* @param m
* @return
*/
@RequestMapping(value = "calllog/findCalllog", method = RequestMethod.POST)
public String findCalllog(Model m, @RequestParam("caller") String caller, @RequestParam("startTime") String startTime, @RequestParam("endTime") String endTime)
{
List<CalllogRange> list = CalllogUtil.getCallLogRanges(startTime, endTime);
List<Calllog> logs = cs.findCallogs(caller,list);
m.addAttribute("calllogs", logs);
return "callLog/calllogList" ;
}
}
二、編寫協處理器模組CalllogCoprossorModel,實現主叫發生的同時即插入被叫記錄
--------------------------------------------------------
1.說明
因為HbaseCustomer消費者,得到的資料都是主叫資料,沒有被叫資料
所以,需要做一個協處理器,來處理當主叫發生的時候,同時寫入被叫的記錄
2.建立協處理器模組CalllogCoprpssorModel,並新增maven依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>calllog.kafka</groupId>
<artifactId>CalllogCustomerModel</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
</dependencies>
</project>
3.新建協處理器區域觀察者類com.calllog.coprossor.CalllogRegion
-------------------------------------
package com.calllog.coprossor;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 協處理器
*/
public class CalllogRegion<postScannerNext> extends BaseRegionObserver {
//被叫引用id
private static final String REF_ROW_ID = "refrowid" ;
//通話記錄表名
private static final String CALL_LOG_TABLE_NAME = "call:calllogs" ;
/**
* 每次put資料之後呼叫
*/
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
super.postPut(e, put, edit, durability);
TableName tName = TableName.valueOf("call:calllogs");
TableName tName1 = e.getEnvironment().getRegion().getRegionInfo().getTable();
if (tName.equals(tName1)) {
//得到rowkey
String rowKey = Bytes.toString(put.getRow());
String[] strs = rowKey.split(",");
//如果是被叫,直接返回
if (strs[3].equals("1")) {
return;
}
//66,15733218888,20181002071335,0,18332561111,063
//取出相應的值
String caller = strs[1];
String time = strs[2];
String callee = strs[4];
String duration = strs[5];
//計算區域號
String hash = CalllogUtil.getHashcode(callee, time, 100);
String newRowKey = hash + "," + callee + "," + time + "," + "1" + "," + caller + "," + duration;
//開始put資料
Put p = new Put(Bytes.toBytes(newRowKey));
p.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("refrowid"), Bytes.toBytes(rowKey));
Table tb = e.getEnvironment().getTable(tName);
tb.put(p);
System.out.println("put over");
}
}
/**
* get之後呼叫 -- 實現被叫查詢時,直接返回主叫的記錄
* 將之前get的結果替換
*/
@Override
public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
//獲得表名
String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
//判斷表名是否是ns1:calllogs
if(!tableName.equals("call:calllogs")){
super.preGetOp(e, get, results);
}
else{
//得到rowkey
String rowkey = Bytes.toString(get.getRow());
//
String[] arr = rowkey.split(",");
//主叫
if(arr[3].equals("0")){
super.postGetOp(e, get, results);
}
//被叫
else{
//得到主叫方的rowkey
String refrowid = Bytes.toString(CellUtil.cloneValue(results.get(0)));
//
Table tt = e.getEnvironment().getTable(TableName.valueOf("call:calllogs"));
Get g = new Get(Bytes.toBytes(refrowid));
Result r = tt.get(g);
List<Cell> newList = r.listCells();
results.clear();
results.addAll(newList);
}
}
}
/**
* 掃描之後呼叫
*/
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
boolean b = super.postScannerNext(e, s, results, limit, hasMore);
//新集合
List<Result> newList = new ArrayList<Result>();
//獲得表名
String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
//判斷表名是否是ns1:calllogs
if (tableName.equals(CALL_LOG_TABLE_NAME)) {
Table tt = e.getEnvironment().getTable(TableName.valueOf(CALL_LOG_TABLE_NAME));
for(Result r : results){
//rowkey
String rowkey = Bytes.toString(r.getRow());
String flag = rowkey.split(",")[3] ;
//主叫
if(flag.equals("0")){
newList.add(r) ;
}
//被叫
else{
//取出主叫號碼
byte[] refrowkey = r.getValue(Bytes.toBytes("f2"),Bytes.toBytes(REF_ROW_ID)) ;
Get newGet = new Get(refrowkey);
newList.add(tt.get(newGet));
}
}
results.clear();
results.addAll(newList);
}
return b ;
}
}
4.新建工具類CoprossorUtil
----------------------------------------------
package com.calllog.coprossor;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
/**
* Created by Administrator on 2017/4/13.
*/
public class CalllogUtil {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
private static SimpleDateFormat sdfFriend = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
//格式化
private static DecimalFormat df = new DecimalFormat();
/**
* 獲取hash值,預設分割槽數100
*/
public static String getHashcode(String caller, String callTime,int partitions) {
int len = caller.length();
//取出後四位電話號碼
String last4Code = caller.substring(len - 4);
//取出時間單位,年份和月份.
String mon = callTime.substring(0, 6);
//
int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions;
return df.format(hashcode);
}
/**
* 起始時間
*/
public static String getStartRowkey(String caller, String startTime, int partitions){
String hashcode = getHashcode(caller, startTime,partitions);
return hashcode + "," + caller + "," + startTime ;
}
/**
* 結束時間
*/
public static String getStopRowkey(String caller, String startTime,String endTime, int partitions){
String hashcode = getHashcode(caller, startTime,partitions);
return hashcode + "," + caller + "," + endTime ;
}
/**
* 對時間進行格式化
*/
public static String formatDate(String timeStr){
try {
return sdfFriend.format(sdf.parse(timeStr));
} catch (Exception e) {
e.printStackTrace();
}
return null ;
}
}
5.打包部署
a.註冊協處理器,並分發到所有hbase節點
[hbase-site.xml]
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.calllog.coprossor.CalllogRegion</value>
</property>
b.將打好的jar包分發到所有節點的/hbase/lib目錄下
c.重啟hbase叢集
d.進入hbase shell,重建表"call:calllogs" "f1" "f2"
$hbase> create 'call:calllogs','f1','f2';
三、生成日誌,收集日誌
1.開啟kafka叢集
[s200 s300 s400]
$> /soft/kafka/bin/kafka-server-start.sh -daemon /soft/kafka/config/server.properties
2.在s100和s200上啟動flume,開始收集日誌
$s100> flume-ng agent -f /soft/flume/conf/calllog.conf -n a1 &
$s200> flume-ng agent -f /soft/flume/conf/calllog.conf -n a1 &
3.開啟kafka的消費者--hbase[進入share/calllog目錄下,找到CalllogCustomerModel.jar包]
$> java -cp CalllogCustomerModel.jar calllog.kafka.hbase.customer.HbaseCustomer
4.開啟日誌生成工具
$calllog> ./calllog.sh
5.執行ssm模組,檢視webapp介面顯示