1. 程式人生 > >大資料探勘更多時間都在於清洗資料

大資料探勘更多時間都在於清洗資料

一、資料清洗的那些事

構建業務模型,在確定特徵向量以後,都需要準備特徵資料線上下進行訓練、驗證和測試。同樣,部署釋出離線場景模型,也需要每天定時跑P加工模型特徵表。

而這一切要做的事,都離不開資料清洗,業內話來說,也就是ETL處理(抽取Extract、轉換Transform、載入Load),三大法寶。

很多初學者,對大資料的概念都是模糊不清的,大資料是什麼,能做什麼,學的時候,該按照什麼線路去學習,學完往哪方面發展,想深入瞭解,想學習的同學歡迎加入大資料學習企鵝群:458345782,有大量乾貨(零基礎以及進階的經典實戰)分享給大家,並且有清華大學畢業的資深大資料講師給大家免費授課,給大家分享目前國內最完整的大資料高階實戰實用學習流程體系。

來自於百度百科

大資料圈裡和圈外,很多朋友都整理過資料,我們這裡稱為清洗資料。

不管你是叱吒風雲的Excel大牛,還是玩轉SQL的資料庫的能人,甚至是專注HQL開發ETL工程師,以及用MapReduce\Scala語言處理複雜資料的程式猿。(也許你就是小白一個)

我想說的是,解決問題的技術有高低,但是解決問題的初衷只有一個——把雜亂的資料清洗乾淨,讓業務模型能夠輸入高質量的資料來源。

不過,既然做的是大資料探勘,面對的至少是G級別的資料量(包括使用者基本資料、行為資料、交易資料、資金流資料以及第三方資料等等)。那麼選擇正確的方式來清洗特徵資料就極為重要,除了讓你事半功倍,還至少能夠保證你在方案上是可行的。

二、大資料的必殺技

在大資料生態圈裡,有著很多開源的資料ETL工具,每一種都私下嚐嚐鮮也可以。但是對於一個公司內部來說,穩定性、安全性和成本都是必須考慮的。

就拿Spark Hive和Hive來說,同樣是在Yarn上來跑P,而且替換任務的執行引擎也很方便。

修改任務執行引擎

的確,Spark的大多數任務都會比MapReduce執行效率要快差不多1/3時間。但是,Spark對記憶體的消耗是很大的,在程式執行期間,每個節點的負載都很高,佇列資源消耗很多。因此,我每次提交Spark離線模型跑任務時,都必須設定下面的引數,防止佔用完叢集所有資源。

spark-submit --master yarn-cluster --driver-memory 5g --executor-memory 2g --num-executors 20

其中:

  • driver-memory是用於設定Driver程序的記憶體,一般不設定,或者1G。我這裡調整到5G是因為RDD的資料全部拉取到Driver上進行處理,那要確保Driver的記憶體足夠大,否則會出現OOM記憶體溢位。
  • executor-memory是用於設定每個Executor程序的記憶體。Executor記憶體的大小決定了Spark作業的效能。
  • num-executors是用於設定Spark作業總共要用多少個Executor程序來執行。這個引數如果不設定,預設啟動少量的Executor程序,會很大程度影響任務執行效率。

單獨的提交Spark任務,優化引數還可以解決大部分執行問題。但是完全替換每天跑P加工報表的執行引擎,從MapReduce到Spark,總會遇到不少意想不到的問題。對於一個大資料部門而言,另可效率有所延遲,但是資料穩定性是重中之重。

Spark執行Stage

所以,大部分資料處理,甚至是業務場景模型每天的資料清洗加工,都會優先考慮Hive基於MapRedcue的執行引擎,少部分會單獨使用編寫MapReduce、Spark程式來進行復雜處理。

三、實踐中的資料清洗

這節要介紹的內容其實很多,單獨對於Hive這方面,就包括執行計劃、常用寫法、內建函式、一些自定義函式,以及優化策略等等。

幸運的是,這方面資源在網上很全,這是一個值得欣慰的點,基本遇到的大多數問題都能夠搜到滿意答案。

因此,文章這個版塊主要順著這條主線來——(我在大資料探勘實踐中所做的模型特徵清洗),這樣對於大資料探勘的朋友們來說,更具有針對性。

3.1 知曉資料來源

這裡不擴充套件資料來源的抽取和行為資料的埋點

大資料平臺的資料來源集中來源於三個方面,按比重大小來排序:

60%來源於關係資料庫的同步遷移: 大多數公司都是採用MySQL和Oracle,就拿網際網路金融平臺來說,這些資料大部分是使用者基本資訊,交易資料以及資金資料。

30%來源於平臺埋點資料的採集:渠道有PC、Wap、安卓和IOS,通過客戶端產生請求,經過Netty伺服器處理,再進Kafka接受資料並解碼,最後到Spark Streaming劃分為離線和實時清洗。

10%來源於第三方資料:做網際網路金融都會整合第三方資料來源,大體有工商、快消、車房、電商交易、銀行、運營商等等,有些是通過正規渠道來購買(已脫敏),大部分資料來源於黑市(未脫敏)。這個市場魚龍混雜、臭氣熏天,很多真實資料被注入了汙水,在這基礎上建立的模型可信度往往很差。

得資料,得天下?

3.2 業務場景模型的背景

看過我以前文章集的朋友都知道一點,我致力於做大資料產品。

在之前開發資料產品的過程中,有一次規劃了一個頁面——使用者關係網路,底層是引用了一個組合模型。

簡單來說是對使用者群體細分,判斷使用者屬於那一類別的羊毛黨群體,再結合業務運營中的彈性因子去綜合評估使用者的風險。

截圖的原型Demo

大家看到這幅圖會有什麼想法?

簡單來說,原型展示的是分析兩個使用者之間在很多維度方面的關聯度

當時這個功能在後端開發過程中對於特徵資料的處理花了很多時間,有一部分是資料倉庫工具HQL所不能解決的,而且還需要考慮完整頁面(截圖只是其中一部分)查詢的響應時間,這就得預先標準化業務模型的輸出結果。

我可以簡單描述下需求場景:

  • 拿IP地址來說,在最近30天範圍內,使用者使用網際網路金融平臺,不管是PC端,還是無線端,每個使用者每個月都會產生很多IP資料集。
  • 對於擁有千萬級別使用者量的平臺,肯定會出現這樣的場景——很多使用者在最近一個月內都使用過相同的IP地址,而且數量有多有少。
  • 對某個使用者來說,他就好像是一個雪花中的焦點,他使用過的IP地址就像雪花一樣圍繞著他。而每個IP地址都曾被很多使用者使用過。

簡單來說,IP地址只是一個媒介,連線著不同使用者。——你中有我,我中有你。

雪花狀

有了上面的背景描述,那麼就需要每個讀者都去思考下這三個問題:

問題一、如何先通過某個使用者最近30天的IP列表去找到使用相同IP頻數最多的那一批使用者列表呢?

問題二、如何結合關係網路的每個維度(IP、裝置指紋、身份證、銀行卡和加密隱私等等),去挖掘與該使用者關聯度最高的那一批使用者列表?

問題三、如何對接產品標準化模型輸出,讓頁面查詢的效應時間變得更快些?

思考就像吃大理核桃般,總是那麼耐人尋味。

3.3 學會用Hive解決70%的資料清洗

對於70%的資料清洗都可以使用Hive來完美解決,而且網路參考資料也很全,所以大多數場景我都推薦用Hive來清洗。——高效、穩定

不過在使用過程中,我有兩點建議送給大家:

第一點建議:要學會顧全大局,不要急於求成,學會把複雜的查詢拆開寫,多考慮叢集整個資源總量和併發任務數。

第二點建議:心要細,線上下做好充足的測試,確保安全性、邏輯正確和執行效率才能上線。

禮物也送了,繼續介紹

對於上述的使用者關係網路場景,這裡舉IP維度來實踐下,如何利用Hive進行資料清洗。

下面是使用者行為日誌表的使用者、IP地址和時間資料結構。

使用者、IP和時間

回到上面的第一個思考,如何先通過某個使用者最近30天的IP列表去找到使用相同IP頻數最多的那一批使用者列表呢?

我當時採取了兩個步驟。

步驟一:清洗最近30天所有IP對應的使用者列表,並去重使用者

select ip,concat_ws('_',collect_set(cast(mid as string)))from tmp.fraud_sheep_behavdetail_unionwhere ip is not null and systime='2016-12-06'group by ip

這裡解釋三個內建函式concat_ws、collect_set和cast,先更瞭解必須去親自實踐:

  • concat_ws,它是用來分隔符字串連線函式。
  • collect_set,它是用來將一列多行轉換成一行多列,並去重使用者。
  • cast,它是用來轉換欄位資料型別。

果然很方便吧,下面是第一個步驟的執行結果。

IP馬賽克

步驟二:清洗使用者在IP媒介下,所有關聯的使用者集列表

select s1.mid,concat_ws('_',collect_set(s2.midset)) as ip_midsetfrom (select ip,mid from tmp.fraud_sheep_behavdetail_union where systime>='2016-11-06' group by ip,mid) s1join (      select ip,concat_ws('_',collect_set(cast(mid as string))) as midset      from tmp.fraud_sheep_behavdetail_union      where ip is not null and systime>='2016-11-06'      group by ip) s2 on (s1.ip=s2.ip)group by s1.mid

最終對於IP媒介清洗的資料效果如下所示:

1816945284629847    1816945284629847_3820150008135667_1850212776606754_3820150012550757_3820150006640108_1823227153612976_3820150001918669_18169452846298471816945284629848    1816945284629848_3820150002527117_100433_3820150009829678_100433_100433_3820150002811537_3820150008901840_3820150012766737_100433_3800000242066917_100433

同理對於其他維度的媒介方法一樣,到這一步,算是完成Hive階段的初步清洗,是不是很高效。

會員ID    性別   加密隱私   身份證號    銀行卡號    IP地址     裝置指紋18231292   男    18231293:男   18232394:男    382015495:男_18232272:男    38201500:女_38201509:女_382937:女    3820152901:男_38204902:男_3820486:男_38201326:女

但是對於分析使用者細分來說,還需要藉助MapReduce,或者Scala來深層次處理特徵資料。

3.4 使用Scala來清洗特殊的資料

對於使用Spark框架來清洗資料,我一般都是處於下面兩個原因:

  • 常規的HQL解決不了
  • 用簡潔的程式碼高效計算,也就是考慮開發成本和執行效率

對於部署本機的大資料探勘環境,可以檢視這兩篇文章來實踐動手下:

工欲善其事,必先利其器。有了這麼好的利器,處理複雜的特徵資料,那都是手到擒來。

藉助於Hive清洗處理後的源資料,我們繼續回到第二個思考——如何結合關係網路的每個維度,去初步挖掘與該使用者關聯度最高的那一批使用者列表?

看到這個問題,又產生了這幾個思考:

  • 目前有五個維度,以後可能還會更多,純手工顯然不可能,再使用Hive好像也比較困難。
  • 每個維度的關聯使用者量也不少,所以基本每個使用者每行資料的處理採用單機序列的程式去處理顯然很緩慢。不過每行的處理是獨立性的。
  • 同一個關聯使用者會在同一個維度,以及每一個維度出現多次,還需要進行累計。

如果才剛剛處理大資料探勘,遇到這樣的問題的確很費神,就連你們常用的Python和R估計也難拯救你們。但是如果實戰比較多,這樣的獨立任務,完全可以併發到每臺計算節點上去每行單獨處理,而我們只需要在處理每行時,單獨呼叫清洗方法即可。

這裡我優先推薦使用Spark來清洗處理(後面給一個MapReduce的邏輯),整個核心過程主要有三個板塊

預處理,對所有關聯使用者去重,並統計每個關聯使用者在每個維度的累計次數

//迴圈每個維度下的關聯使用者集for(j <- 0 until value.length){    //用列表存放所有關聯使用者集    if(value.apply(j).split(SEPARATOR4).size==2 && value.apply(j).split(SEPARATOR4).apply(0)!=mid){       midList.append(value.apply(j))     }     if(setMap.contains(value.apply(j))){      //對每個維度關聯使用者的重複次數彙總        val values = setMap.get(value.apply(j)).get        setMap=setMap.+((value.apply(j),1+values))         }else{        setMap=setMap.+((value.apply(j),1))      }}

評分,迴圈上述關聯使用者集,給關聯度打一個分

for(ii <- 0 until distinctMidList.size){    var reationValue = 0.0    //分佈取每個關聯使用者    val relation = distinctMidList.apply(ii)    //關聯使用者的會員ID    val mid = relation.split(SEPARATOR4).apply(0)    //關聯使用者的性別    val relationSex = relation.split(SEPARATOR4).apply(1)    val featureStr = new StringBuilder()    //迴圈每個關聯維度去給關聯使用者打分    for(jj <- 1 to FeatureNum.toInt){       var featureValue = 0.0       //獲取該關聯使用者在每個維度下重複次數       val resultMap = midMap.get(jj).get.get(relation).getOrElse(0)       if(jj==1){          //加密隱私,確定權重為10          featureValue=resultMap*10       }else if(jj==2 || jj==3){

標準化清洗處理,使用者關聯用json串拼接

3820150000934593 | 1    | [{"f1":"0","f2":"0","f3":"0","f4":"15","f5":"60","s":"1","r":"75","m":"3820150000316460"},{"f1":"0","f2":"0","f3":"0","f4":"30","f5":"30","s":"1","r":"60","m":"1816945313571344"},{"f1":"0","f2":"0","f3":"0","f4":"45","f5":"90","s":"0","r":"135","m":"3820150000655195"}]

得到上面清洗結果,我們才能更好的作為模型的源資料輸出,感覺是不是很費神,所以才印證了這句話——做Data Mining,其實大部分時間都花在清洗資料

3.5 附加分:使用MapReduce來清洗特殊的資料

針對上述的資料清洗,同樣可以MapReduce來單獨處理。只是開發效率和執行效率有所影響。

當然也不排除適用於MapReduce處理的複雜資料場景。

對於在本地Windows環境寫MapRecue程式碼,可以借鑑上述文章中部署的資料探勘環境,修改下Maven工程的pom.xml檔案就可以了。

<dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-mapreduce-client-core</artifactId>    <version>2.7.2</version></dependency>        <dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-client</artifactId>    <version>2.6.0</version></dependency>

而我在以往做大資料探勘的過程裡,也有不少場景需要藉助MR來處理,比如很早的一篇文章《一種新思想去解決大矩陣相乘》,甚至是大家比較常見的資料傾斜——特別是處理平臺行為日誌資料,特別容易遇到資料傾斜。

這裡提供一個上述Spark清洗資料的MR程式碼邏輯,大家可以對比看看與Spark程式碼邏輯的差異性。

Map階段

public static class dealMap           extends Mapper<Object,Text, Text,Text>{   @Override   protected void setup(Context context)           throws IOException,InterruptedException{      /**       * 初始化Map階段的全域性變數,目前使用不上       */    }                    public void map(Object key,Text value,Context context)           throws IOException,InterruptedException{        //類似Spark,每一行讀取檔案,按分隔符劃分        String[] records = value.toString().split("\u0009");        StringBuffer k = new StringBuffer();        //這裡Key包含Mid和Sex        String keys = k.append(records[0]).append("\u0009")              .append(records[1]).toString();        //接下來對剩餘維度資料進行迴圈        for(int i=2;i<records.length;i++){             //解決兩個問題,和Spark類似             //確定與該使用者關聯的使用者列表             //確定關聯使用者在每一個維度的累計頻數         }         for(int j=2;j<records.length;j++){              //迴圈計算使用者關聯得分,和Spark類似          }          /**           * 設定使用者Mid和sex作為Map階段傳輸的Key,使用者關聯維度使用者集作為value傳輸到reduce階段            */     context.write(new Text(keys.toString()), new Text(value.toString()));        }}

Reduce階段(這裡用不上)

public static class dealReduce               extends Reducer<Text,Text,Text,Text> {   public void reduce(Text key, Iterable<Text> values,Context context)         throws IOException, InterruptedException{        /**         * 一般都會用Reduce階段,但是這裡用不上         */        for (Text val : values) {                             }    }}

Drive階段

public static Boolean run(String input,String ouput)  throws IOException, ClassNotFoundException, InterruptedException{    Configuration conf = new Configuration();    Job job = Job.getInstance(conf, "");    job.setJarByClass();    job.setMapperClass();    job.setReducerClass();    job.setNumReduceTasks(10);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(Text.class);    Path output = new Path(ouput);    FileInputFormat.setInputPaths(job,input);    FileOutputFormat.setOutputPath(job, output);    output.getFileSystem(conf).delete(output,true);    Boolean result=job.waitForCompletion(true);    return result;}

上面這三個階段就是MR任務常規的流程,處理上述問題的思路其實和Spark邏輯差不多。只是這套框架性程式碼量太多,有很多重複性,每寫一個MR任務的工作量也會比較大,執行效率我並沒有去測試作比較。

如果Spark跑線上任務模型會出現不穩定的話,我想以後我還是會遷移到MapReduce上去跑離線模型。

總結

說到這裡,整篇文章概括起來有三點:

  • 講述了資料清洗在業務場景建模過程中的重要性和流程操作。
  • 介紹了兩款主流計算框架的適用場景和差異性。
  • 更列舉了不同資料處理工具在每個業務場景下的優勢和不同。

但是,還是那麼一句話——使用什麼技術不在乎,我更迷戀業務場景驅動下的技術挑戰。

與你溝通最關鍵的,也許會是直屬領導,也許會是業務運營人員,甚至是完全不懂技術的客戶。他們最關心的是你在業務層面上的技術方案能否解決業務痛點問題。

 

 很多初學者,對大資料的概念都是模糊不清的,大資料是什麼,能做什麼,學的時候,該按照什麼線路去學習,學完往哪方面發展,想深入瞭解,想學習的同學歡迎加入大資料學習企鵝群:458345782,有大量乾貨(零基礎以及進階的經典實戰)分享給大家,並且有清華大學畢業的資深大資料講師給大家免費授課,給大家分享目前國內最完整的大資料高階實戰實用學習流程體系。