1. 程式人生 > >shell加python實現程式自動化控制

shell加python實現程式自動化控制

為做到資料的實時傳輸(實時:當前時間傳輸上一個小時的資料),shell用於控制整個流程,python用於處理資料。

shell程式碼如下:
#bin/bash
#########
##   篤篤學車4G執行指令碼
##   編寫者:zhangqm
##   日期:2018-04-04
##   呼叫方式:nohup sh duduxueche.sh day/hour > ../log/duduxueche.log 2>&1 &
##   type資料有兩種:1、day:按天跑 2、hour:按小時跑  
########

# 程式的日誌目錄
log='/data1/u_lx_data/zhangqm/yanjie/dudu/log'

# 4G表的HDFS目錄
sourceDataHdfs='/user/db_lte/public/sada_lte_xdr03_103'

# MR清單資料,HDFS目錄
mrDataListHdfs='/user/u_lx_data/private/zhangqm/dudu'

# MR清單資料,本地目錄
dataList='/data1/u_lx_data/zhangqm/yanjie/dudu/data'

# 結果資料
resultList='/data1/u_lx_data/zhangqm/yanjie/dudu/result'


# 稽核語句
# 呼叫方式:Check_num 目錄 時間 小時
function Check_num()
{
  if [ $3 == 'day' ];then
     num=$(hadoop fs -du -h -s $1/$2 |awk -F" " '{print $1}'|sed 's/ //g')
	   echo $num
  else
	   num=$(hadoop fs -du -h -s $1/$2/$3|awk -F" " '{print $1}'|sed 's/ //g')
	   echo $num
  fi
}	

# 得到清單資料
# 呼叫方式:Deal_data 時間 小時
function Deal_data()
{
  if [ $2 == 'day' ];then
     # 拉清單資料
     echo '開始處理'${dayTime_1}'天資料。。。'
     hadoop jar dudu4g_0514.jar ${sourceDataHdfs}/$1/ ${mrDataListHdfs}/$1
     # 檢查天資料是否產生
     size=$(Check_num ${mrDataListHdfs} $1 $2)
     if [ ${size} > 0 ] ; then 
        hadoop fs -cat ${mrDataListHdfs}/$1/* > ${dataList}/$1.txt
     else
        echo '天資料沒有產生,請核查。。。'
     fi
  else
	   # 拉清單資料
	   echo '開始處理'${dayTime}${hourTime_1}'小時資料。。。'
     hadoop jar dudu4g_0514.jar ${sourceDataHdfs}/$1/$2 ${mrDataListHdfs}/$1/$2
     # 檢查小時資料是否產生
     size=$(Check_num ${mrDataListHdfs} $1 $2)
     if [ ${size} > 0 ] ; then 
        if [ ! -d ${dataList}/$1 ];then
           mkdir ${dataList}/$1
        fi
        hadoop fs -cat ${mrDataListHdfs}/$1/$2/* > ${dataList}/$1/$2.txt
     else
        echo '當前小時資料沒有產生,請核查。。。'
     fi
  fi
}	

#按小時跑
function Run_hour()
{
    while true
    do
         # 獲取系統當前時間
         dayTime=$(date +"%Y%m%d" )
    
         # 獲取系統當前小時的前1小時的小時
         hourTime_1=$(date -d "-1 hour" +"%H")
         
         if [ ${hourTime_1} -eq 23 ];then
            dayTime=$(date -d "-1 day " +"%Y%m%d")
         fi
         
         # 測試小時資料是否已經產生
         hadoop fs -test -e ${mrDataListHdfs}/${dayTime}/${hourTime_1}
         
         #十五分鐘檢測一次,主要是解決跑程式所花費的時間不同問題
         if [ $? -eq 0 ] ;then 
            echo '******************************************'  
            echo ${dayTime}${hourTime_1}'小時已經跑過。。。。' 
            echo '等待十五分鐘。。。。。'
            sleep 900
            echo '當前時間為:'$(date +"%Y-%m-%d %H:%M")
            
         else 
            echo '******************************************' 
            echo ${dayTime}${hourTime_1}'小時沒有跑過。。。。'
            echo '當前時間為:'$(date +"%Y-%m-%d %H:%M")
            Deal_data ${dayTime} ${hourTime_1}
    
            if [ ! -d ${resultList}/${dayTime} ];then
               mkdir ${resultList}/${dayTime}
            fi
            
            echo '得到結果資料,當前時間為:'$(date +"%Y-%m-%d %H:%M")
            python duduxueche.py ${dataList} ${dayTime} ${hourTime_1} ${resultList}
            echo ${dayTime}${hourTime_1}'小時全部結束,進入下一個小時的處理。。。。'
            
            #把70的資料結果放到231的HDFS上
            ./duduxueche.exp ${resultList} ${dayTime} ${hourTime_1}
         fi  
        # exit 2
    done
}

#按天跑
function Run_day()
{
    while true
    do
         # 獲取系統當前時間
         dayTime=$(date +"%Y%m%d" )
    
         # 獲取系統時間的前一天的時間
         dayTime_1=$(date -d "-1 day $dayTime" +"%Y%m%d")
         
         # 測試天資料是否已經產生
         hadoop fs -test -e ${mrDataListHdfs}/${dayTime_1}
         
         #8小時檢測一次,主要是解決跑程式所花費的時間不同問題
         if [ $? -eq 0 ] ;then 
            echo '******************************************'  
            echo ${dayTime_1}'天已經跑過。。。。' 
            echo '等待8個小時。。。。。'
            sleep 28800
            echo '當前時間為:'$(date +"%Y-%m-%d %H:%M")
            
         else 
            echo '******************************************' 
            echo ${dayTime_1}'天沒有跑過。。。。'
            echo '當前時間為:'$(date +"%Y-%m-%d %H:%M")
            Deal_data ${dayTime_1} $1
    
            #if [ ! -d ${resultList}/${dayTime} ];then
             #  mkdir ${resultList}/${dayTime}
            #fi
            
            echo '得到結果資料,當前時間為:'$(date +"%Y-%m-%d %H:%M")
            python duduxueche.py ${dataList} ${dayTime} ${hourTime_1} ${resultList}
            #echo ${dayTime}${hourTime_1}'小時全部結束,進入下一個小時的處理。。。。'
            
            # 把70的資料結果放到231的HDFS上
            #./duduxueche.exp ${resultList} ${dayTime} ${hourTime_1}
         fi  
        # exit 2
    done
}

if [ $1 == 'hour' ] ;then
   Run_hour
else
   Run_day $1
fi
Python程式碼如下:
# -*- coding:utf-8 -*-
from datetime import datetime
import pandas as pd
import sys
#*****************
##按口徑出的phone結果
# 口徑:1)關鍵詞且官網 2)關鍵詞且app 3)關鍵詞訪問大於1 4)官網訪問大於1
#*****************
def Main():
    dataList = sys.argv[1]+"/"+ sys.argv[2]+"/"+sys.argv[3]+".txt"
    # 對照表清單資料(加密和不加密手機號對應關係)
    mapRuletxt = sys.argv[1]+"/"+"mapRuletxt.txt"
    targetTxt = sys.argv[4]+"/"+ sys.argv[2]+"/"+sys.argv[3]+".txt"
    # 儲存最終解果
    phoneSet = set()
    # 儲存有關鍵詞行為的phone
    kwPhoneSet = set()
    # 儲存有官網行為的phone
    webPhoneSet = set()
    # app行為
    appPhoneSet = set()
    #儲存對照表清單資料
    dict={}
    uname = ['phone', 'time', 'name']
    # 找官網的資料
    def web(str):
        if str.find('app') == -1 and str.find('kw') == -1:
            return str
    print("開始。。。。。")
    print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

    with open(mapRuletxt,'r') as fr:
        for line in fr:
            line = line.strip().split("\t")
            dict[line[1]] = line[0]
    with open(dataList,'r') as fr:
        for line in fr:
            line = line.strip().split("\t")
            if line[2].find('app') != -1: # app的
                appPhoneSet.add(line[0])
            elif line[2].find('kw') != -1: #關鍵詞搜尋的
                kwPhoneSet.add(line[0])
            else: # 官網訪問行為的
                webPhoneSet.add(line[0])
    # 有過關鍵詞搜尋行為且有過官網訪問行為的phone
    for kwphone in kwPhoneSet:
        if kwphone in webPhoneSet:
            phoneSet.add(kwphone)
    # 有過關鍵詞搜尋行為且有過app行為的phone
    for kwphone in kwPhoneSet:
        if kwphone in appPhoneSet:
            phoneSet.add(kwphone)
    # 關鍵詞訪問大於1的phone
    df = pd.read_table(dataList, sep="\t", header=None, names=uname, index_col=False)[['phone', 'name']]
    kw = df[df.name.str.contains('kw')].groupby('phone')['name'].agg([('uv',pd.Series.nunique)]).reset_index()
    for phone in  kw[kw.uv > 1]['phone']:
        phoneSet.add(phone)
    # 官網訪問大於1的phone
    web = df.applymap(web).dropna().groupby('phone')['name'].agg([('uv',pd.Series.nunique)]).reset_index()
    for phone in  web[web.uv > 1]['phone']:
        phoneSet.add(phone)
    with open(targetTxt,'w+') as fw:
        for phone in phoneSet:
            if phone in dict:
                fw.write(dict[phone]+"\n")
    print("結束。。。。。")
    print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if __name__ == "__main__":
    Main()