如何使用 AWS 建立易於擴充套件的 Data PipeLine
本文是之前的補充升級版本.
本文中將以我們實際線上的一個任務來說明應該如何使用 Batch
我們的場景類似上面的金融服務交易後分析:
1. 一個隨時更新的 xml 檔案。
- 需要每天定時檢查更新,發現 xml 更新後需要更新對應的檔案到相關的 S3 中。
- 更新完相關的檔案之後,需要啟動另一分析程式,分析更新後的 xml 檔案,將相關的內容上傳到另外的雲服務中。
所以目前的結構是這樣的:
+-----------++------++-----++---++------++-----+ |cloud watch+--> |lambda+-->+batch| +->+SNS| +> |lambda| +->+batch| +-----------++------++-----++---++------++-----+
cloud watch 用來做定時任務,定時通過 Lambda 觸發相關的 Batch, batch 發現 xml 更新之後會發送通知到 SNS, 由 SNS 來觸發 lambda, 然後由 lambda 啟用 Batch 。
設定作業定義
在 Batch 任務執行之前, 需要將任務的執行條件進行預定義, 等到任務具體啟動時 batch 會讀取預定義的作業任務, 然後執行.
建立
- 作業定義名稱 名字可以任意起名
- 作業角色 指定的可以執行 batch 任務的 IAM 角色
- 容器映像 這裡填寫 docker image 的地址
- 在環境中根據需求填寫 vcpu 和 記憶體
- 在環境變數中新增相關的執行時的環境變數即可
修訂
- 修訂指的是在已有的任務定義中進行修改和補充,修改之後,版本會 +1,同時,之前的版本還會保留
建立作業佇列
TODO//
通過 Lambda 實現自動化定時啟動
定時啟動原理
CloudWatch 中可以設定定時啟動, 在 Lambda 中, 直接以 CloudWatch 作為觸發, 即可實現定時啟動任務.
相關程式碼
Lambda 相關 Python 程式碼示例如下:
from __future__ import print_function import json import boto3 print('Loading function') batch = boto3.client('batch') def lambda_handler(event, context): # Log the received event print("Received event:" + json.dumps(event, indent=2)) # Get parameters for the SubmitJob call # http://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html jobName = event['jobName'] jobQueue = event['jobQueue'] jobDefinition = event['jobDefinition'] # containerOverrides and parameters are optional if event.get('containerOverrides'): containerOverrides = event['containerOverrides'] else: containerOverrides = {} if event.get('parameters'): parameters = event['parameters'] else: parameters = {} try: # Submit a Batch Job response = batch.submit_job(jobQueue=jobQueue, jobName=jobName, jobDefinition=jobDefinition, containerOverrides=containerOverrides, parameters=parameters) # Log response from AWS Batch print("Response:" + json.dumps(response, indent=2)) # Return the jobId jobId = response['jobId'] return { 'jobId': jobId } except Exception as e: print(e) message = 'Error submitting Batch Job' print(message) raise Exception(message)
使用 SNS 和 Lambda 將 Batch 串聯起來
目前, 我們的相關結構是這樣的:
+-----------++------++-----++---++------++-----+ |cloud watch+--> |lambda+-->+batch| +->+SNS| +> |lambda| +->+batch| +-----------++------++-----++---++------++-----+
在 Batch 執行完畢之後, 會發送一個 Notification 到 SNS 中, 然後觸發 lambda 之後啟動後續的 Batch 任務 .
SNS 的訊息格式
{ "Records": [ { "EventVersion": "1.0", "EventSubscriptionArn": "arn:awe-4689-b71c-43657a0ce152", "EventSource": "aws:sns", "Sns": { "SignatureVersion": "1", "Timestamp": "2018-01-20T04:06:35.010Z", "Signature": "i/FC0Z3XT5M4jhru0wP65dD4vaCDYwczszj8V+aCCJX7SCbY1A+X7FjdrSWDeEFMaCqQhg4Wq/ch204kMHg47k4NCQ00cLJmBNc9XnrLiQMuAv1pcSdYu3uWikTlJ8E95K7h6Y/kq2/Tq1f+ELu6r5jEMV3/pKxSaRrdXmTZOZzjQDJKTT1fGNWIgRFOA/Ey+gcaZ8Fg==", "SigningCertUrl": "https://sn1.pem", "MessageId": "28dcb39b-12da-551856c", "Message": "{\"type\":\"lovelive\",\"jobXmlTag\":\"job\",\"gzipped\":true,\"emptyCDATA\":false}}", "MessageAttributes": {}, "Type": "Notification", "UnsubscribeUrl": "https://sns.us-eace152", "TopicArn": "arn:aws:sns:us-east-1:162141517946:start_feed_consumer", "Subject": null } } ] }
Lambda 的相關處理程式碼
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function import json import boto3 print('Loading function') batch = boto3.client('batch') def lambda_handler(event, context): # Log the received event. print("Received event:" + json.dumps(event, indent=2)) download_message = event['Records'][0]['Sns']['Message'] # Get parameters for the SubmitJob call # http://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html jobName = "lambda-feed-consumer" jobQueue = "feeds" jobDefinition = "FeedConsumer" containerOverrides = { 'environment': [ { 'name': 'DOWNLOAD_MESSAGE', 'value': download_message }, { 'name': 'PARALLELISM', 'value': 15 } ], } try: # Submit a Batch Job response = batch.submit_job(jobQueue=jobQueue, jobName=jobName, jobDefinition=jobDefinition, containerOverrides=containerOverrides) # Log response from AWS Batch print("Response:" + json.dumps(response, indent=2)) # Return the jobId jobId = response['jobId'] return { 'jobId': jobId } except Exception as e: print(e) message = 'Error submitting Batch Job' print(message) raise Exception(message)
這裡的程式碼看註釋很容易理解.
Batch 的一些優勢和缺點
優勢
Batch 結合 Lambda 可以實現按需啟動伺服器, 極大地節省了伺服器的使用費用.