1. 程式人生 > >基於歷史資料的使用者訪問次數,每天新老使用者,日活,周活,月活的hive計算

基於歷史資料的使用者訪問次數,每天新老使用者,日活,周活,月活的hive計算

最近有一個需求,統計每天的新老使用者,日活,周活,月活。
我們每天的增量資料會加入到hive歷史資料表中,包含使用者訪問網站的一些資訊,欄位有很多,包括使用者唯一標識guid。
當然了日活,周活,月活就是一個count(distinct(guid))語句,非常常用的sql。

但是這裡的問題是:

A:每天的新老使用者應該怎麼統計呢?
B:這還不簡單,判斷使用者guid是否存在與歷史庫guid中嘛?
A:歷史資料幾十個T,大概一百億行,你要每天將當日資料(2~3億行)與歷史資料幾億行進行join判斷?
B:額,這個,這個,好像不行哦!

是的,歷史資料裡面是使用者網站訪問行為,同一個使用者在同一天,不同的天都有可能出現,guid在歷史表中會有多次。如果直接join,效能很差,實際上是做了很多不必要的工作。

解決方案:

維護一張使用者表,裡面有4列:guid, starttime, endtime, num,分別是使用者的guid,第一次訪問時間,最後一次訪問時間,訪問天數;
從某個狀態開始,歷史表中guid是唯一的;
當天資料去重後,與歷史庫join,如果guid在歷史庫出現過,則將endtime更新為當天時間,num加一;
否則,這是一個新使用者,插入歷史庫,starttime, endtime都為當天時間,num初始值為1。

維護了這麼一張使用者表後,接下來就可以寫hql統計業務了,計算當天新老使用者時,只需要與這個歷史庫進行join就行了(目前為止4千萬),當日guid去重後是1千多萬,這樣就是4千萬~1千萬的join了,與開始4千萬~100億的join,效能會有巨大提升。

hive歷史表的設計與hive相關配置
可以看到這裡hive歷史表history_helper需要頻繁修改,hive表支援資料修改需要在${HIVE_HOME}/conf/hive-site.xml中新增事務支援:

<property>
    <name>hive.support.concurrency</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.dynamic.partition.mode</name
>
<value>nonstrict</value> </property> <property> <name>hive.txn.manager</name> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value> </property> <property> <name>hive.compactor.initiator.on</name> <value>true</value> </property> <property> <name>hive.compactor.worker.threads</name> <value>1</value> </property>

為了提高查詢速度,hive歷史表與增量表這裡都分桶,hive-xite.xml配置:

<property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
</property>

為了提高reduce並行度,也設定一下:

set mapred.reduce.tasks = 50;

這個最好在hive命令列配置,表明只在當前程式使用該配置,就不要配置配置檔案了。
歷史庫建表語句:

create external table if not exists hm2.history_helper
(
  guid string,
  starttime string,
  endtime string,
  num int
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ("transactional"="true");

當天增量表,儲存去重後的guid,建表語句:

create table if not exists hm2.daily_helper
(
  guid string,
  dt string
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ("transactional"="true");

思路

由於這種需要寫成定時模式,所以這裡用python指令碼來實現,將hive查詢結果儲存到本地檔案result.txt,然後python讀取result.txt,連線資料庫,儲存當天的查詢結果。

程式碼

helper.py

#!/usr/bin/python
# -*- coding:utf-8 -*-

# hive更新歷史使用者表,日常查詢,儲存到MySQL

import sys
import datetime
import commands
import MySQLdb

# 獲取起始中間所有日期
def getDays(starttime,endtime,regx):
    datestart=datetime.datetime.strptime(starttime,regx)
    dateend=datetime.datetime.strptime(endtime,regx)
    days = []
    while datestart<=dateend:
        days.append(datestart.strftime(regx))
        datestart+=datetime.timedelta(days=1)
    return days

# 獲得指定時間的前 n 天的年、月、日,n取負數往前,否則往後
def getExacYes(day, regx, n):
    return (datetime.datetime.strptime(day,regx) + datetime.timedelta(days=n)).strftime(regx)

# 獲得距離現在天數的年、月、日,n 取值正負含義同上,昨天就是getYes(regx,-1)
def getYes(regx, n):
    now_time = datetime.datetime.now()
    yes_time = now_time + datetime.timedelta(days=n)
    yes_time_nyr = yes_time.strftime(regx)
    return yes_time_nyr

# 執行hive命令
def execHive(cmd):
    print cmd
    res = commands.getstatusoutput(cmd)
    return res

# 獲得當前是星期幾
def getWeek(regx):
    now_time = datetime.datetime.now()
    week = now_time.strftime(regx)
    return week

# 格式化日期,加上雙引號
def formatDate(day):
    return "\"" + day + "\""

# 資料儲存到mysql
def insertMysql(dt, path, tbName, regx):
    # new, dayAll, stay
    values = []
    with open(path) as file:
        line = file.readline()
        while line:
            values.append(line.strip())
            line = file.readline()
    dayAll = int(values[1])
    new = float(values[0])/dayAll
    old = 1 - new

    # 獲取資料庫連線
    conn = MySQLdb.connect("0.0.0.0", "statistic", "123456", "statistic")
    # 獲取遊標
    cursor = conn.cursor()

    # 查詢昨天的使用者人數
    yesDay = getExacYes(dt, regx, -1)
    sql = 'select dayAll from %s where dt = %s'%(tbName, formatDate(yesDay))
    try:
        cursor.execute(sql)
    except Exception as e:
        print e

    yesAll = int(cursor.fetchall()[0][0])
    stay = float(values[2]) / yesAll
    print stay
    # 獲取遊標
    cursor2 = conn.cursor()
    sql = 'insert into  %s\
    values("%s",%f,%f,%f,%d)'%(tbName, dt, new, old, stay, dayAll)
    print sql
    try:
        cursor2.execute(sql)
        conn.commit()
    except:
        conn.rollback()
    finally:
        conn.close()

# 初始化,刪除臨時表,並且建立
def init():
    # 設定分桶環境
    cmd = 'source /etc/profile;hive -e \'set hive.enforce.bucketing = true;set mapred.reduce.tasks = 50;\''
    (status,result) = execHive(cmd)
    # 清除當天的臨時表,結果儲存
    cmd = 'source /etc/profile;hive -e \'drop table hm2.daily_helper;\''
    (status,result) = execHive(cmd)
    if status == 0:
        print '%s昨天臨時表刪除完畢...'%(day)
    else:
        print result
        sys.exit(1)
    cmd = 'source /etc/profile;hive -e \'create table if not exists hm2.daily_helper\
    (\
    guid string,\
    dt string\
    )\
    clustered by(guid) into 50 buckets \
    stored as orc TBLPROPERTIES ("transactional"="true");\''
    (status,result) = execHive(cmd)
    if status == 0:
        print '%s臨時表建立完畢...'%(day)
    else:
        print result
        sys.exit(1)

# 主函式入口
if __name__ == '__main__':
    regx = '%Y-%m-%d'
    resultPath = '/home/hadoop/statistic/flash/helper/result.txt'
    days = getDays('2018-07-01','2018-07-20',regx)
    tbName = 'statistic_flash_dailyActive_helper'
    for day in days:
        init()
        # 當天資料去重後儲存到臨時表daily_helper
        cmd = 'source /etc/profile;hive -e \'insert into hm2.daily_helper select distinct(guid),dt from hm2.helper \
        where dt = "%s" and guid is not null;\''%(day)
        print '%s資料正在匯入臨時表...'%(day)
        (status,result) = execHive(cmd)
        if status == 0:
            print '%s資料匯入臨時表完畢...'%(day)
        else:
            print result
            sys.exit(1)
        # guid存在則更新 endtime 與 num
        cmd = 'source /etc/profile;hive -e \'update hm2.history_helper set endtime = "%s",num = num + 1 \
        where guid in (select guid from hm2.daily_helper);\''%(day)
        print '正在更新endtime 與 num...'
        (status,result) = execHive(cmd)
        if status == 0:
            print '%s history_helper資料更新完畢'%(day)
        else :
            print result
            sys.exit(1)
        # 當天新使用者
        cmd = 'source /etc/profile;hive -e \'select count(1) from hm2.daily_helper \
        where guid not in (select guid from hm2.history_helper);\' > %s'%(resultPath)
        (status,result) = execHive(cmd)
        if status != 0:
            print result
            sys.exit(1)
        # 不存在插入
        cmd = 'source /etc/profile;hive -e \'insert into hm2.history_helper\
        select daily.guid,dt,dt,1 from hm2.daily_helper daily\
        where daily.guid not in (select guid from hm2.history_helper where guid is not null);\''
        print '正在插入資料到history_helper表...'
        (status,result) = execHive(cmd)
        if status == 0:
            print '%s資料插入hm2.history_helper表完成'%(day)
        else:
            print result
            sys.exit(1)
        # 當天總人數
        cmd = 'source /etc/profile;hive -e \'select count(1) from hm2.daily_helper;\' >> %s'%(resultPath)
        (status,result) = execHive(cmd)
        if status != 0:
            print result
            sys.exit(1)
        # 次日活躍留存
        cmd = 'source /etc/profile;hive -e \'select count(1) from\
        (select guid from hm2.helper where dt = "%s" group by guid) yes\
        inner join\
        (select guid from hm2.helper where dt = "%s" group by guid) today\
        where yes.guid = today.guid;\' >> %s'%(getExacYes(day, regx, -1), day, resultPath)
        (status,result) = execHive(cmd)
        if status != 0:
            print result
            sys.exit(1)
        # 結果儲存到mysql
        insertMysql(day, resultPath, tbName, regx)
        print '=========================%s hive 查詢完畢,結果儲存資料到mysql完成=============================='%(day)

這是在處理歷史資料,然後就是每天定時處理了,在linux crontab里加個定時器任務就好了。