1. 程式人生 > >分散式任務排程平臺 → XXL-JOB 初探

分散式任務排程平臺 → XXL-JOB 初探

開心一刻

  旁邊的女乘客太吵,我實在忍無可忍,便對她說:“你能不能讓我睡會兒?”

  她揮手就給了我一個耳光:“你個臭流氓!”

  我頓時就清醒了,理論到:“你讓我睡一會怎麼了嗎”

  她害羞的低下了頭,說道:“人家不是隨便的人”

  我:“我也不是隨便的人,下一站我們下車把話說清楚”

任務排程

  相信大家對任務排程都不陌生,說的通熟一點就是定時任務;這個在我們的專案中或多或少都存在,我們可以用 JDK 自帶的(Timer、ScheduledExecutor)來實現,也可以用 Spring 的 Scheduler 來實現,不管用以上哪種方式,我們都是在單機上跑,如果我們以叢集的方式部署,會不會出現什麼問題 ?

  叢集中的各個節點都會執行定時排程,會有重複執行的問題,那怎麼辦? 我們可以加配置,只啟動某個節點的定時任務,但是這時候又會出現單點問題

  那有沒有什麼辦法,既能避免重複執行,又不會出現單點問題呢? 分散式排程應運而生,常見的分散式任務排程框架有:quartz 、cronsun、Elastic-job、saturn、lts、TBSchedule、xxl-job 等

  quartz 我已經簡單講過,有興趣的可以去看看:請點我,你們會發現:樓主壓根就沒講 quartz 的叢集模式。你們發現的很對,我就是沒講,就問你氣不氣 ?

  既然你們對 quartz 已經有了一定的瞭解了 ,那麼它的叢集模式交給你們自己了

  今天我們就一起來了解下另外一個分散式排程平臺:xxl-job

  關於 xxl-job 是什麼、有什麼特性、發展歷程、接入了哪些公司、各個版本的新特性等等問題,我都不會去講,因為官方文件已經說的非常清楚了。xxl-job 是國產的,如果文件還看不懂,那就需要回學校再造了。但是我還是想強調下它的架構圖

  通過這個架構圖,我們可以對其有個大致的瞭解;大體上分為排程中心 和 執行器,排程中心通過排程規則(cron表示式)對執行器中的任務進行排程,執行器收到排程後,執行具體的任務(Job)

  既然官方文件都說的非常細緻了,那我還能講什麼呢 ? 好像確實麼什麼可以說的了, 那今天就到這吧,大家散會!

  大家先別急著走,雖然下面的內容在官方文件中已經存在,但是卻很容易被我們忽略;我會在搭建的過程中來穿插著一些問題,來鞏固我們容易忽略的點

單節點搭建

  我們先搭一個簡單的,排程中心 和 執行器 都先搭建成單節點

  按照官方的文件來,一步一步很容易搭建成功

  原始碼下載

    原始碼地址:xxl-job,可以 git clone 也可以 Download ZIP ,不管何種方式,我們拿到了原始碼,匯入到 IDEA,結構如下

    

  初始化 “排程資料庫”

    SQL 指令碼在原始碼中已存在,路徑: xxl-job-master\doc\db\tables_xxl_job.sql ,執行此指令碼,建立資料庫和表,如下圖

    

   配置&部署 排程中心

    配置檔案: appliction.properties ,內容如下

### web
server.port=8080
server.servlet.context-path=/xxl-job-admin

### actuator
management.server.servlet.context-path=/actuator
management.health.mail.enabled=false

### resources
spring.mvc.servlet.load-on-startup=0
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/

### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########

### mybatis
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
#mybatis.type-aliases-package=com.xxl.job.admin.core.model

### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

### datasource-pool
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.maximum-pool-size=30
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=HikariCP
spring.datasource.hikari.max-lifetime=900000
spring.datasource.hikari.connection-timeout=10000
spring.datasource.hikari.connection-test-query=SELECT 1

### xxl-job, email
spring.mail.host=smtp.qq.com
spring.mail.port=25
[email protected]
[email protected]
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory

### xxl-job, access token
xxl.job.accessToken=

### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en")
xxl.job.i18n=zh_CN

## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

### xxl-job, log retention days
xxl.job.logretentiondays=30
View Code

    需要改的地方不多,埠號可能需要根據實際情況進行修改,然後就是資料庫的地址、使用者名稱和密碼需要改成自己的,email伺服器最好配上(告警用的上),排程中心與執行器之間的安全訪問 token( xxl.job.accessToken ) 最好也配置上

    出於演示,改下資料庫的配置就好,其他的保持預設;我們啟動排程中心,訪問: http://localhost:8080/xxl-job-admin ,預設登入賬號 “admin/123456”, 登入後執行介面如下圖所示

     “排程中心” 已經部署成功

  配置&部署 執行器

    配置檔案: application.properties ,內容如下

# web port
server.port=8081
# no web
#spring.main.web-environment=false

# log config
logging.config=classpath:logback.xml


### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job, access token
xxl.job.accessToken=

### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
View Code

    埠號配置一個未被使用的埠,排程中心地址配置成我們之前部署的排程中心的地址即可;至於 xxl.job.accessToken ,和排程中心配置成一樣即可

    配置檔案中各個配置的註釋寫的非常清楚,大家根據實際情況進行配置即可

    執行器的示例有好幾個,我們啟動 springboot 版本的;啟動不報錯就行了,它會自動註冊到排程中心,如下圖

  配置排程規則&任務

    排程中心通過排程規則對執行器中的任務進行排程,現在排程中心和執行器都部署好了,就缺排程規則和任務了

    任務在示例程式碼中已經存在了, SampleXxlJob.java :

package com.xxl.job.executor.service.jobhandler;

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob開發示例(Bean模式)
 *
 * 開發步驟:
 * 1、在Spring Bean例項中,開發Job方法,方式格式要求為 "public ReturnT<String> execute(String param)"
 * 2、為Job方法添加註解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷燬方法")",註解value值對應的是排程中心新建任務的JobHandler屬性的值。
 * 3、執行日誌:需要通過 "XxlJobLogger.log" 列印執行日誌;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Component
public class SampleXxlJob {
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);


    /**
     * 1、簡單任務示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public ReturnT<String> demoJobHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobLogger.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        return ReturnT.SUCCESS;
    }


    /**
     * 2、分片廣播任務
     */
    @XxlJob("shardingJobHandler")
    public ReturnT<String> shardingJobHandler(String param) throws Exception {

        // 分片引數
        int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex();
        int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal();

        XxlJobLogger.log("分片引數:當前分片序號 = {}, 總分片數 = {}", shardIndex, shardTotal);

        // 業務邏輯
        for (int i = 0; i < shardTotal; i++) {
            if (i == shardIndex) {
                XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
            } else {
                XxlJobLogger.log("第 {} 片, 忽略", i);
            }
        }

        return ReturnT.SUCCESS;
    }


    /**
     * 3、命令列任務
     */
    @XxlJob("commandJobHandler")
    public ReturnT<String> commandJobHandler(String param) throws Exception {
        String command = param;
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try {
            // command process
            Process process = Runtime.getRuntime().exec(command);
            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                XxlJobLogger.log(line);
            }

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
        } catch (Exception e) {
            XxlJobLogger.log(e);
        } finally {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        }

        if (exitValue == 0) {
            return IJobHandler.SUCCESS;
        } else {
            return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
        }
    }


    /**
     * 4、跨平臺Http任務
     *  引數示例:
     *      "url: http://www.baidu.com\n" +
     *      "method: get\n" +
     *      "data: content\n";
     */
    @XxlJob("httpJobHandler")
    public ReturnT<String> httpJobHandler(String param) throws Exception {

        // param parse
        if (param==null || param.trim().length()==0) {
            XxlJobLogger.log("param["+ param +"] invalid.");
            return ReturnT.FAIL;
        }
        String[] httpParams = param.split("\n");
        String url = null;
        String method = null;
        String data = null;
        for (String httpParam: httpParams) {
            if (httpParam.startsWith("url:")) {
                url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
            }
            if (httpParam.startsWith("method:")) {
                method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
            }
            if (httpParam.startsWith("data:")) {
                data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
            }
        }

        // param valid
        if (url==null || url.trim().length()==0) {
            XxlJobLogger.log("url["+ url +"] invalid.");
            return ReturnT.FAIL;
        }
        if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
            XxlJobLogger.log("method["+ method +"] invalid.");
            return ReturnT.FAIL;
        }
        boolean isPostMethod = method.equals("POST");

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try {
            // connection
            URL realUrl = new URL(url);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod(method);
            connection.setDoOutput(isPostMethod);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            // data
            if (isPostMethod && data!=null && data.trim().length()>0) {
                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.write(data.getBytes("UTF-8"));
                dataOutputStream.flush();
                dataOutputStream.close();
            }

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) {
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            }

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                result.append(line);
            }
            String responseMsg = result.toString();

            XxlJobLogger.log(responseMsg);
            return ReturnT.SUCCESS;
        } catch (Exception e) {
            XxlJobLogger.log(e);
            return ReturnT.FAIL;
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (connection != null) {
                    connection.disconnect();
                }
            } catch (Exception e2) {
                XxlJobLogger.log(e2);
            }
        }

    }

    /**
     * 5、生命週期任務示例:任務初始化與銷燬時,支援自定義相關邏輯;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public ReturnT<String> demoJobHandler2(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");
        return ReturnT.SUCCESS;
    }
    public void init(){
        logger.info("init");
    }
    public void destroy(){
        logger.info("destory");
    }


}
View Code

    是一個任務集,裡面每一個被 @XxlJob 修飾的都是一個任務,我們以名為: demoJobHandler 的任務來做演示

    任務已經定好,目前就只差排程規則了,我們去排程中心管理介面進行配置;預設情況下,xxl-job 會幫我們自動配置好一個任務,如下

     直接用它是可以的,但是為了清楚怎麼配置,我們重新配置一個

     各個配置項的具體含義大家可以去看官方文件,裡面都有詳細的介紹;

    簡單點來說上圖的配置,就是每隔 3 秒,排程中心會去排程 示例執行器 的 demoJobHandler 任務

  啟動排程

    配置和部署都已完成,現在差的就是啟動排程了,我們啟動它

     然後我們就可以在排程日誌頁面檢視排程中心的排程日誌了,如下所示

   問題

    現在不管是排程中心,還是執行器,都是單節點的,都存在單節點問題

    那如何解決了,單節點的解決方案往往就是叢集,我們可以將排程中心和執行器都部署成叢集,而 xxl-job 又是支援的,而且叢集部署非常簡單、方便

叢集搭建

  叢集架構圖簡單如下

  nginx 只是對排程中心的請求(排程中心管理頁面的操作)做負載均衡,它不涉及任務的排程與回撥,這裡就不配置 nginx 了, 我們重點來看下排程中心叢集與執行器叢集的搭建

  排程中心叢集

    排程中心叢集的搭建非常簡單,只需要注意兩點:DB配置保持一致,叢集機器時鐘保持一致(單機叢集忽視)

    出於演示,我們就做單機叢集處理,那麼我們只需要在 IDEA 中再啟動一個排程中心節點就好,埠號配置不一樣就好;排程中心共啟動兩個節點,之前的埠號是8080, 這個我們改成 8088

    

    啟動之後,我們就可以從http://localhost:8080/xxl-job-admin,http://localhost:8088/xxl-job-admin 對排程中心控制檯進行訪問了,具體就不演示了, 大家可以自行去操作

    生產環境下,會通過 nginx 對外暴露唯一地址,由 nginx 對這兩個(或者多個)進行負載均衡

  執行器叢集

    搭建同樣非常簡單,只需要注意兩點

      1、執行器回撥地址(xxl.job.admin.addresses)需要保持一致;執行器根據該配置進行執行器自動註冊等操作。

      2、同一個執行器叢集內AppName(xxl.job.executor.appname)需要保持一致;排程中心根據該配置動態發現不同叢集的線上執行器列表

    由於是單機叢集搭建,埠的唯一性也需要注意

    

    啟動之後,去排程中心修改路由策略為輪訓,再啟動任務排程,然後就可以去檢視排程日誌了

  宕機測試

    這個就不演示了,大家自行去測試,停掉某個節點,整個排程是否能正常完成

  疑問

    1、排程中心叢集部署,任務排程的時候,會不會每個節點都發起排程請求,從而產生重複排程的問題

      這個問題在官方文件中有說明:基於資料庫的叢集方案,資料庫選用Mysql;叢集分散式併發環境中進行定時任務排程時,會在各個節點會上報任務,存到資料庫中,執行時會從資料庫中取出觸發器來執行,如果觸發器的名稱和執行時間相同,則只有一個節點去執行此任務。

      因此對同一個排程,不會產生重複排程問題

    2、執行器叢集收到排程請求後,會不會每個節點都去執行任務

      這個問題不成立,我們不是配置了路由策略嗎,排程中心會根據路由策略將排程請求傳送給具體的某個執行器了,那何來每個執行器都執行任務呢 ?

    如果官方文件看的細的話,我們會發現有如下一段話

      

    不只非同步排程和非同步執行,其實還包括非同步回撥,xxl-job 中用到了大量的佇列、非同步處理

    當然還有一些其他的疑問,絕大部分在官方文件都能找到答案,所以需要大家多讀、細讀

總結

  1、單機模式,大家瞭解就好,生產環境肯定都是叢集模式的;但 xxl-job 的叢集部署也非常簡單

  2、xxl-job 的全非同步化&輕量級設計,可以保證使用有限的執行緒支撐大量的JOB併發執行

  3、通篇都是在 xxl-job 的原始碼上進行的,如何將它應用進我們的實戰專案中了 ? 實戰篇,我們下期見

參考

  XXL