在CDH5.14.4 中安裝StreamSets與案例執行
阿新 • • 發佈:2018-11-24
在CDH5.14.4 中安裝StreamSets與案例執行
標籤(空格分隔): 大資料平臺構建
- 一: Streamset 簡介與系統環境介紹
- 二: 安裝軟體準備
- 三: 在CDH5.14.4 整合使用StreamSets
- 四: streamsets 基本使用案例執行
一: Streamset 簡介與系統環境介紹
1.1: StreamSet 簡介
StreamSets由Informatica前首席產品官Girish Pancha和Cloudera前開發團隊負責人Arvind Prabhakar於2014年創立。他們成立該公司主要是應對來自動態資料(data in motion)的挑戰 - 包括資料來源,資料處理和資料本身,這是一個稱為“資料漂移“(https://streamsets.com/reports/data-drift/)的問題。StreamSets設想從頭開始管理資料流,避免已有產品和工具的缺陷,並啟用一種管理動態資料(data in motion)的新方法。 最新的產品StreamSets Dataflow Performance Manager,也叫DPM,主要用於構建端到端的資料流。DPM是一個執行控制中心,可以讓你對映(資料流),內建的測量和監測確保持續的資料傳輸和控制動態資料(data in motion)的效能。首先,它將你不同的資料流對映到支援你的每個關鍵業務流程的拓撲中。然後監測這些拓撲的日常執行情況,根據掌握的效能情況,以滿足應用的SLA為目標,確保你始終提供及時和可信的資料。 StreamSet的架構處理資料流程
二:安裝軟體準備
2.1:下載StreamSets
下載:StreamSet
https://archives.streamsets.com/index.html
下載檔案:
STREAMSETS-3.0.0.0.jar
STREAMSETS_DATACOLLECTOR-3.0.0.0-el7.parcel
manifest.json
2.2: 配置httpd-server
yum install -y httpd* service httpd start chkconfig httpd on mkdir -p /var/×××w/html/streamsets cp -p STREAMSETS-3.0.0.jar /opt/cloudera/csd/ chown cloudera-scm:cloudrea-scm -R /opt/cloudera/csd/ mv manifest.json /var/×××w/html/streamset/ mv STREAMSETS_DATACOLLECTOR-3.3.0-el7.parcel /var/×××w/html/streamset/ 從啟cdh的CM 伺服器 service cloudera-scm-server restart
三:在CDH5.14.4 整合使用StreamSets
3.1 分發,配置,啟用StreamSets
3.2 安裝 StreamSets
重啟cloudera-scm-server
cd /etc/init.d/
./cloudera-scm-server restart
四: streamsets 基本使用
4.1 登入到streamsets
預設使用者名稱:admin
密碼:admin
4.2 下載官網測試資料
準備工作: 從官網下載測試資料 https://×××w.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
建立測試目錄並賦予許可權:
mkdir -p /flyfish/test_stream
mkdir /flyfish/test_stream/data
mkdir /flyfish/test_stream/error
mkdir /flyfish/test_stream/out
chmod -R 777 /flyfish/test_stream
將測試資料拷貝到 /flyfish/test_stream/data 目錄下
cp -p nyc_taxi_data.csv /flyfish/test_stream/data
4.3.建立第一個Pipelines
點選dataFormat 標籤,修改選擇如下選擇
預覽檔案
新增流選擇器
${record:value('/payment_type') == 'CRD'}
指令碼放在 Jython >configuration>Jython>Script 中
try:
for record in records:
cc = record.value['credit_card']
if cc == '':
error.write(record, "Payment type was CRD, but credit card was null")
continue
cc_type = ''
if cc.startswith('4'):
cc_type = 'Visa'
elif cc.startswith(('51','52','53','54','55')):
cc_type = 'MasterCard'
elif cc.startswith(('34','37')):
cc_type = 'AMEX'
elif cc.startswith(('300','301','302','303','304','305','36','38')):
cc_type = 'Diners Club'
elif cc.startswith(('6011','65')):
cc_type = 'Discover'
elif cc.startswith(('2131','1800','35')):
cc_type = 'JCB'
else:
cc_type = 'Other'
record.value['credit_card_type'] = cc_type
output.write(record)
except Exception as e:
error.write(record, e.message)
使用Field Masker來遮蔽信用卡的資訊
配置寫入目的地
流程預覽測試
新增Expression Evaluator處理器
編輯流執行與輸出