1. 程式人生 > >用shell控制hql執行,如何控制多個階段之間序列,階段內部並行

用shell控制hql執行,如何控制多個階段之間序列,階段內部並行

上週組裡同學給了一個數據任務:

1.在hive上傳汽車詞包

2.根據汽車詞包圈出指定時間段內的cookie

3.根據cookie找出這些使用者的所有搜尋記錄

4.從所有搜尋資料中找到含有明星的搜尋記錄

5.根據每個明星group by,計數

我的解決辦法如下:

1.第一第二第三階段我寫了一個sql語句



2.第四階段我用了python處理,因為我無法寫成

select query

from sousuo

where query like

(

select "%star%"

from star

)

所以我就用了python逐行處理

python程式碼如下


3.將python的結果上傳hive,然後上傳明星詞表,然後group by,sql程式碼如下


這個程式剛寫完的時候還挺開心,因為我用了hivevar來封裝了引數,可以完全自動化執行。

但是週一同學使用的時候反映半年的詞包提不上去,只能提15天的。然後我就意識到sql對映的mapper個數太多,導致任務提不上去。

首先sql語句不適合將多個步驟融合在一起,會導致sql對映的mapper檔案太多,導致任務提不上去。

所以每步都分開。

第一階段:

上傳詞包.sql

第二階段:

圈pc端cookie.sql

圈wise端cookie.sql

第三階段:

圈pc端的搜尋資料

圈wise端的搜尋資料

第四階段:

用python對搜尋資料做明星詞的過濾。

第五階段:

將過濾出來的結果上傳hive,用group by計數

main函式:

main函式為了保證序列執行,所有的sql語句都不能後臺啟動,如果後臺啟動,下一階段將不會等上一階段的結果。

但是第二、第三階段中的pc端和wise端應該並行做。

解決方法是

對第二階段的兩個sql都用後臺啟動:

nohup sh qe.sh stage2_pc &

nohup sh qe.sh stage2_wise &

但是要在原來qe.sh指令碼的後面,寫上echo "" > stage2_pc.done,這樣sql執行成功後,就會生成相應的done檔案。

啟動成功後,不能立即啟動第三階段,也不能通過if [[ $? == 0 ]]來檢測第二階段是否執行成功。

而要寫一個while迴圈不停檢測stage2_pc.done和stage2_wise.done是否生成。

如果生成才能繼續執行第三階段,如果沒有生成就一直在此檢測。

第三階段的並行啟動同樣這樣處理。

當然上面的這種處理辦法,只是將頻道分開,並未將一個大的時間段對映成多個小的時間段。

部門的產品線mcdt系統的後臺正是這樣處理的,如果sql包含的時間範圍太大,頻道太多,都會造成sql生成的mapper個數太多,所以對頻道分開,對時間段也會分開,拆成一個一個的,然後並行啟動。

因為涉及到多個階段,然後每個階段中又有多個sql,所以會為每個階段建立一個資料夾,檢測這個資料夾下所有sql是否成功執行的方法就是,就是檢測done檔案個數和sql檔案個數是否相等,如果相等就執行下一個stage,如果不相等,就一直檢測,這樣就能嚴格保證每個階段內的sql並行執行,多個階段之間序列執行,下面給出了mcdt後臺控制流程的shell中的一段程式碼:


借鑑於這種解決方案,這個同學的任務也應該將時間段分開,頻道分開,對映成多個sql檔案,並且將這些sql檔案放在一個資料夾下,一起後臺執行,因為sql檔案太多,所以在最後的while迴圈中檢測目錄下done檔案個數和sql檔案個數是否相等比較方便。

之前看後臺的這個程式碼感覺沒有什麼稀奇,現在真的感覺原作者的良苦用心。

遺留問題:

1.起止時間是在配置檔案中寫的,把時間片切開,比如30天一個sql檔案,這麼多的sql檔案都是自動生成,考慮用python動態組裝sql語句。

2.成功生成done檔案,error檔案如何生成的。

重啟三次指令碼都不能成功,就生成error檔案,另外注意done檔案在sql執行成功之後才能生成,所以呼叫sql不能後臺啟動。


3.如果資料量太大,用python逐行處理可能比較耗時,這裡完全可以考慮用原生的mapred來寫,因為源資料不能通過hive建表,所以通過hadoop寫或者spark寫都可以,對容器過濾,可以用到spark中的filter()函式。

為啥我會意識到這裡可以用spark?

我們可以簡單考慮搜尋資料可以人為分成多個部分,然後在每個部分同時執行這個pyhton程式碼,多個部分之間可以完全不影響的並行執行,這就是mapred最合適的應用場景。