pipeline 分散式任務排程器

目標: 基於docker的布式任務排程器, 比quartzs,xxl-job 更強大的分散式任務排程器。

可以將要執行的任務打包為docker映象,或者選擇已有映象,自定義指令碼程式,通過pipeline框架來實現排程。

開源地址: https://github.com/jadepeng/docker-pipeline

架構

  • pipeline master 中心節點,管理和排程任務
  • pipeline agent 執行任務的節點,接收到任務後,呼叫docker執行pipeline任務

功能特性 && TODO List

  • [x] 分散式框架,高可用,服務註冊與狀態維護
  • [x] Agent執行任務
  • [x] rolling日誌介面
  • [x] 執行老版本pipeline任務
  • [x] 支援定時執行任務(固定週期和cron表示式)
  • [ ] 快速建立任務,支援python、node等指令碼程式直接執行
    • [x] python、java等基礎映象
    • [x] 快速docker映象任務API
    • [ ] 快速建立指令碼任務
  • [ ] 根據資源配額(記憶體、CPU)排程任務, 執行任務需要指定資源配額
  • [ ] agent 增加label標識,排程時可以排程到指定label的agent,比如gpu=true
  • [ ] 增加任務管理web, 管理提交任務、查詢執行日誌等
    • [x] 複用騰訊bk-job 網頁
    • [ ] 修改bk-job前端,適配pipeline

進展

2021.07.31

  • 支援定時執行任務(固定週期和cron表示式)
  • 增加分散式mongodb鎖,多master時,同時只能有一個master schedule任務

2021.07.28

  • 新增執行老版本pipeline任務能力
  • 增加日誌介面

2021.07.27

  • 引入bk-job的ui,待修改

2021.07.21

  • Master 呼叫 agent執行任務
  • agnet 啟動docker執行任務

2021.07.19

  • 基於jhipster搭建框架
  • 分散式實現

資料結構

一個pipeline 任務:

  • 支援多個pipelineTask
  • 一個pipelineTask 包含多個Step
@Data
public class Pipeline { @Id
private String id; private String name; @JSONField(name = "pipeline")
private List<PipelineTask> pipelineTasks = new ArrayList<>(); private List<Network> networks = Lists.newArrayList(new Network()); private List<Volume> volumes = Lists.newArrayList(new Volume()); private String startNode; /**
* 排程型別:
* 1) CRON, 設定cronExpression
* 2) FIX_RATE, 設定fixRateInSeconds
*/
private ScheduleType scheduleType = ScheduleType.NONE; /**
* CRON表示式,在scheduleType=CRON 時生效
*/
private String cronExpression; /**
* 固定週期執行,比如每隔多少s,在scheduleType=FIX_RATE 時生效
*/
private int fixRateInSeconds; /**
* 是否需要排程,為true時,才排程
*/
@Indexed
private boolean enableTrigger; private long lastTriggerTime; @Indexed
private long nextTriggerTime; /**
* 執行超時時間
*/
private int executorTimeout; /**
* 重試次數
*/
private int executorFailRetryCount; /**
* 記憶體限制
*/
private String memory; /**
* CPU 限制
*/
private String cpu; @Data
@Builder
public static class PipelineTask { /**
* 名稱
*/
String name; /**
* 別名
*/
String alias; /**
* 依賴的pipelines,必須依賴的執行完成才能執行該PipelineTask
*/
List<String> dependencies; /**
* 任務步驟,順序執行
*/
List<Step> steps;
} @Data
public static class Network {
String name = "pipeline_default";
String driver = "bridge";
} @Data
public static class Volume {
String name = "pipeline_default";
String driver = "local";
} @Data
public static class StepNetwork {
private String name;
private List<String> aliases = Lists.newArrayList("default"); public StepNetwork(String name) {
this.name = name;
}
} }

舉例:

{
"_id" : "29103d5e4a77409b9f6050eea8110bb3",
"name" : "docker image pipeline",
"pipelineTasks" : [
{
"name" : "docker image pipeline",
"steps" : [
{
"name" : "defaultJob",
"image" : "java-pipeline:1.0.1",
"workingDir" : "/workspace",
"environment" : {},
"networks" : [
{
"name" : "pipeline_default",
"aliases" : [
"default"
]
}
],
"onSuccess" : false,
"authConfig" : {}
}
]
}
],
"networks" : [
{
"name" : "pipeline_default",
"driver" : "bridge"
}
],
"volumes" : [
{
"name" : "pipeline_default",
"driver" : "local"
}
],
"cronExpression" : "0 0 * * * ?",
"fixRateInSeconds" : 0,
"scheduleType" : "CRON",
"enableTrigger" : true,
"lastTriggerTime" : 1627744509047,
"nextTriggerTime" : 1627747200000,
"executorTimeout" : 0,
"executorFailRetryCount" : 0,
"isAvailable" : 1,
"runningPipelines" : [],
"finishedPipeliens" : [],
"created_by" : "admin",
"created_date" : "2021-07-20T04:33:16.477Z",
"last_modified_by" : "system",
"last_modified_date" : "2021-07-31T15:15:09.048Z"
}

使用說明

安裝部署

編譯

使用mvn編譯

mvn package -DskipTests

部署master

根據需要,修改master的prod配置檔案application-prod.yml

包含kafka配置,server埠,mongodb地址,jwt secret配置。

mongodb 會自動新建collection和初始化資料,無需手動匯入資料。

kafka:
producer:
bootstrap-servers: 127.0.0.1:9092
retries: 3
batch-size: 2000
buffer-memory: 33554432
consumer:
group-id: consumer-pipeline
auto-offset-reset: earliest
enable-auto-commit: true
bootstrap-servers: 172.31.161.38:9092 server:
port: 8080 spring:
data:
mongodb:
uri: mongodb://127.0.0.1:28017
database: pipeline jhipster:
security:
authentication:
jwt:
base64-secret:

注意master的jwt secret需要和agent的保持一致。

配置好後,啟動:

nohup java -jar pipeline-master-$version.jar --spring.profiles.active=prod &

可以將application-prod.yml放在和jar包同一目錄。

部署agent

根據需要,修改master的prod配置檔案application-prod.yml

包含:

  • eureka的defaultZone,配置master的地址
  • docker地址
    • docker-tls-verify: 是否啟動tls驗證
    • docker-cert-path:啟動tls驗證的ca證書
    • pipeline-log-path: 執行日誌儲存路徑

eureka:
instance:
prefer-ip-address: true
client:
service-url:
defaultZone: http://admin:${jhipster.registry.password}@127.0.0.1:8080/eureka/ server:
port: 8081 application:
docker-server:
docker-tls-verify: true
docker-cert-path: /mnt/parastor/pipeline/ca/
pipeline-log-path: /mnt/parastor/pipeline/logs/ jhipster:
security:
authentication:
jwt:
base64-secret:

執行老版本任務

POST /api/pipelines/exec-old

Body:
{
"networks":[
{
"driver":"bridge",
"name":"pipeline_network_3eac4b36209a41e58a5f22dd403fee50"
}
],
"pipeline":[
{
"alias":"Word",
"dependencies":[],
"name":"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1",
"nextPipelines":[],
"steps":[
{
"alias":"Word",
"auth_config":{},
"command":[
"echo $CI_SCRIPT | base64 -d | /bin/bash -e"
],
"entrypoint":[
"/bin/bash",
"-c"
],
"environment":{
"CI_SCRIPT":"CmlmIFsgLW4gIiRDSV9ORVRSQ19NQUNISU5FIiBdOyB0aGVuCmNhdCA8PEVPRiA+ICRIT01FLy5uZXRyYwptYWNoaW5lICRDSV9ORVRSQ19NQUNISU5FCmxvZ2luICRDSV9ORVRSQ19VU0VSTkFNRQpwYXNzd29yZCAkQ0lfTkVUUkNfUEFTU1dPUkQKRU9GCmNobW9kIDA2MDAgJEhPTUUvLm5ldHJjCmZpCnVuc2V0IENJX05FVFJDX1VTRVJOQU1FCnVuc2V0IENJX05FVFJDX1BBU1NXT1JECnVuc2V0IENJX1NDUklQVAplY2hvICsgamF2YSAtY3AgL2RhdGF2b2x1bWUvcGRmX3RvX3dvcmQvcGRmYm94X3V0aWwtMS4wLVNOQVBTSE9ULmphciBjb20uaWZseXRlay5pbmRleGVyLlJ1bm5lciAtLWlucHV0UERGIC9kYXRhdm9sdW1lL2V4dHJhY3QvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkvcGRmL2VjNWMwYjk0M2QwYjRmNDI5MzcyMmE1ZGRjNjFlNjZkL0hTNy5wZGYgLS1vdXRwdXRXb3JkIC9kYXRhdm9sdW1lL2V4dHJhY3QvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkvcGRmVG9Xb3JkL2VjNWMwYjk0M2QwYjRmNDI5MzcyMmE1ZGRjNjFlNjZkLyAtLXNjaGVtYUlucHV0UGF0aCAvZGF0YXZvbHVtZS9leHRyYWN0L2ticWEvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkgLS1lbnRpdHlJbmRleFBhdGggL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGZUb1dvcmQvZW50aXR5IC0tZmllbGRJbmRleFBhdGggL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGZUb1dvcmQvZmllbGQgLS10eXBlIGx1Y2VuZSAtLW91dHB1dCAvZGF0YXZvbHVtZS9leHRyYWN0L2Y5MmMyYTc1YmFlODRiYjA4ODMyMDVkYjNmMmRhZTc5L3BkZlRvV29yZC9lYzVjMGI5NDNkMGI0ZjQyOTM3MjJhNWRkYzYxZTY2ZC9lbnRpdHlJbmZvLnR4dApqYXZhIC1jcCAvZGF0YXZvbHVtZS9wZGZfdG9fd29yZC9wZGZib3hfdXRpbC0xLjAtU05BUFNIT1QuamFyIGNvbS5pZmx5dGVrLmluZGV4ZXIuUnVubmVyIC0taW5wdXRQREYgL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGYvZWM1YzBiOTQzZDBiNGY0MjkzNzIyYTVkZGM2MWU2NmQvSFM3LnBkZiAtLW91dHB1dFdvcmQgL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGZUb1dvcmQvZWM1YzBiOTQzZDBiNGY0MjkzNzIyYTVkZGM2MWU2NmQvIC0tc2NoZW1hSW5wdXRQYXRoIC9kYXRhdm9sdW1lL2V4dHJhY3Qva2JxYS9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OSAtLWVudGl0eUluZGV4UGF0aCAvZGF0YXZvbHVtZS9leHRyYWN0L2Y5MmMyYTc1YmFlODRiYjA4ODMyMDVkYjNmMmRhZTc5L3BkZlRvV29yZC9lbnRpdHkgLS1maWVsZEluZGV4UGF0aCAvZGF0YXZvbHVtZS9leHRyYWN0L2Y5MmMyYTc1YmFlODRiYjA4ODMyMDVkYjNmMmRhZTc5L3BkZlRvV29yZC9maWVsZCAtLXR5cGUgbHVjZW5lIC0tb3V0cHV0IC9kYXRhdm9sdW1lL2V4dHJhY3QvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkvcGRmVG9Xb3JkL2VjNWMwYjk0M2QwYjRmNDI5MzcyMmE1ZGRjNjFlNjZkL2VudGl0eUluZm8udHh0Cg=="
},
"image":"registry.iflyresearch.com/aimind/java:v1.0.0",
"name":"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1",
"networks":[
{
"aliases":[
"default"
],
"name":"pipeline_network_3eac4b36209a41e58a5f22dd403fee50"
}
],
"on_success":true,
"volumes":[
"pipeline_default:/aimind",
"/mnt/parastor/aimind/shared/:/share",
"/mnt/parastor/aimind/pipeline-jobs/2021/07/26/3eac4b36209a41e58a5f22dd403fee50:/workspace",
"/mnt/parastor/aimind/datavolumes/carmaster:/datavolume"
],
"working_dir":"/workspace"
}
]
}
],
"volumes":[
{
"driver":"local",
"name":"pipeline_default"
}
]
}

成功返回:

{
"retcode": "000000",
"desc": "成功",
"data": {
"id": "8137f344-f52d-4595-bdbb-425363847b61",
}
}

可根據id獲取日誌。

獲取job執行日誌

GET /api/pipelines/jobLog/{jobid}/

結果:

{
"retcode": "000000",
"desc": "成功",
"data": {
"currentTask": null,
"logs": [
{
"id": "e76a686f68b64c0783b7721b058be137",
"jobId": "8137f344-f52d-4595-bdbb-425363847b61",
"status": "FINISHED",
"taskName": "pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1",
"exitedValue": 0,
"logs": [
"proc \"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1\" started",
"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1:+ java -cp /datavolume/pdf_to_word/pdfbox_util-1.0-SNAPSHOT.jar com.iflytek.indexer.Runner --inputPDF /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdf/ec5c0b943d0b4f4293722a5ddc61e66d/HS7.pdf --outputWord /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/ec5c0b943d0b4f4293722a5ddc61e66d/ --schemaInputPath /datavolume/extract/kbqa/f92c2a75bae84bb0883205db3f2dae79 --entityIndexPath /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/entity --fieldIndexPath /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/field --type lucene --output /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/ec5c0b943d0b4f4293722a5ddc61e66d/entityInfo.txt",
"proc \"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1\" exited with status 0"
]
}
],
"exitedValue": 0,
"status": "FINISHED",
"pipelineJobSt": 1627477250599,
"pipelineJobFt": 1627477274299
}
}

週期任務

如果pipelien需要週期執行,需要配置enableTrigger為true,同時設定按照CRON或者FIX_RATE` 執行:

  • FIX_RATE: 固定週期,通過fixRateInSeconds配置週期執行時間

示例:每360秒執行一次:

{
// pipeline ...
"pipelineTasks" : [ ],
"fixRateInSeconds" : 360,
"scheduleType" : "FIX_RATE",
"enableTrigger" : true
}
  • CRON: 按照CRON表示式週期執行,通過cronExpression配置.

示例:每小時開始的時候執行一次:

{
// pipeline ...
"pipelineTasks" : [ ],
"cronExpression" : "0 0 * * * ?",
"scheduleType" : "CRON",
"enableTrigger" : true
}

更多待解鎖