1. 程式人生 > >大資料開發實戰系列之電信客服(1)

大資料開發實戰系列之電信客服(1)

大資料實戰開發系列,以實戰為主,輔以一些基礎知識,關於電信客服,在網上也有很多的資料,這裡我自然會去參考網上的資料,程式的整體設計是在今天開始的,老夫儘量在本週末錢結束這個電信客服的程式編寫。因為我也是一個學習者,所以在程式編寫過程中難免會存在問題,有問題還請大家指出,有則改之,無則加勉。大家共同進步。本教程適合接觸大資料開發不久或者還沒接觸大資料開發,或者小萌新。老鳥就多提意見吧,我改。

部落格原文地址:大資料開發實戰系列之電信客服(1)

專案背景

關於專案背景,我就照搬網上的了。通訊運營商每時每刻會產生大量的通訊資料,例如通話記錄,簡訊記錄,彩信記錄,第三方服務資費等等繁多資訊。資料量如此巨大,除了要滿足使用者的實時查詢和展示之外,還需要定時定期的對已有資料進行離線的分析處理。例如,當日話單,月度話單,季度話單,年度話單,通話詳情,通話記錄等等+。我們以此為背景,尋找一個切入點,學習其中的方法論。當前我們的需求是:統計每天、每月以及每年的每個人的通話次數及時長。

專案架構

關於這個專案架構,網上的是用的 MapReduce , 老夫最近在看 Flink , 因此,我們把這個計算引擎換成 Flink 引擎。先看一下原來的系統架構:

再來看看我這邊修改後的:

那我們現在就根據這個流程一步一步來走。

專案實現

專案平臺搭建

關於大資料的一個平臺搭建,我在這裡就不做介紹,目前老夫的所有的部落格涉及到大資料平臺搭建的還是比較少的,我這裡貼幾個我寫過的涉及到平臺搭建的幾篇部落格,是老夫之前寫過的:

  • 大資料框架開發基礎之Hadoop(2) 從零開始搭建叢集
  • 大資料框架開發基礎之Kafka入門
  • 大資料框架開發基礎之Zookeeper入門
  • 大資料儲存框架之HBase(1) 概述

忘記說了,這裡老夫使用的是 Flume 裡面的 TailDirSource , 但是在我們設計的時候,使用的是 MemoryChannel , 這個對我們專案本身的測試而言,影響不大。關於 Flume裡面的幾個 sink , source ,後期我會繼續向大家做一個具體的介紹。這裡就先留一個疑問,就是TailDirSource和 MemoryChannel 怎麼做選擇?

資料生產

這裡我們的資料格式,需要確定下:
| 列名 | 解釋 | 舉例 |
| ------------ | ---------------------------- | -------------- |
| call1 | 第一個手機號碼 | 15369468720 |
| call1_name | 第一個手機號碼人姓名(非必須) | 李雁 |
| call2 | 第二個手機號碼 | 19920860202 |
| call2_name | 第二個手機號碼人姓名(非必須) | 衛藝 |
| date_time | 建立通話的時間 | 20171017081520 |
| date_time_ts | 建立通話的時間(時間戳形式) | |
| duration | 通話持續時間(秒) | 0600 |

這是我們的資料格式。
怎麼去設計我們的這個資料生產?下面是一個思路,

a).建立Java集合類存放模擬的電話號碼和聯絡人

b).隨機選取兩個手機號碼當作“主叫”與“被叫”(注意判斷兩個手機號不能重複),產出call1與call2欄位資料

c).建立隨機生成通話建立時間的方法,可指定隨機範圍,最後生成通話建立時間,產出date_time欄位資料;

d).隨機一個通話時長,單位:秒,產出duration欄位資料;

e).將產出的一條資料拼接封裝到一個字串中;

f).使用IO操作將產出的一條通話資料寫入到本地檔案中.

這裡,我就不貼程式了,具體的程式可以參考電信客服資料生產

資料採集

資料採集模組,我們這裡採用的是 linux -> flume -> kafka -> flink
資料在我們的本地 Linux系統中生產,由Flume將資料送到 Kafka , Flink再從我們的Kafka中拿取資料。
這裡,安裝配置就不多說了,我們啟動Kafka前,先啟動zookeeper,kafka正常啟動後就不用管它了,他只是一個消費通道。現在資料採集的核心是怎麼去採集,我們採用的是 Flume , Flume這裡也只是需要配置一下,就可以完成我們採集任務:

a1.sources=r1
a1.channels=c1

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/datas/CI123/cidata/flink-telecom-customer-service-posiotion.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/datas/CI123/cidata/flink-telecom-customer-service.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = flink-telecom-customer-service
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

我不知道這裡的配置要不要講,簡單的講一下。首先,我們這裡只有一個 source和一個channel , 如果有多個就寫多個。在我們的source中,他的型別是 TAILDIR ,這個source源有個特點就是支援斷點續傳,這個斷電續傳的實現是因為它在做傳輸的時候,會有一個記錄檔案,叫做 *.json的位置檔案,這個檔案是你自己配置的,建議你自己配置好,否則預設檔案不知道在哪個旮旯裡面。這個 json檔案裡面記錄的就是 flume採集失敗的時候最近的那次採集的位置,下次恢復後,再從那個地方採集。接下來就是配置檔案位置了,如果我們吧配置了多個源,這裡就會有多個檔案,最後配置當前的 source 使用的是那個 channel 進行傳輸。
在 channel 裡面,我們使用的是 KafkaChannel ,這個 channel 可以直接把資料送入 kafka 的 topic 裡面。所以呢,我們需要配置好 kafka地址,topic 名以及我們的消費者組。這些概念可以參考我上面提供的 Kafka 的入門博文。

資料清洗(ELK)

關於資料的清洗,目前程式還在編寫中,暫時留在下一集我們在聊吧。有喜歡的朋友可以關注我,以後也會不定期的去更新一些內容,也包括自己的一些感受,踩得坑啥的