1. 程式人生 > >Springboot定時任務原理及如何動態創建定時任務

Springboot定時任務原理及如何動態創建定時任務

mark 引入 需要 run security upd them done 需求

一、前言

  上周工作遇到了一個需求,同步多個省份銷號數據,解綁微信粉絲。分省定時將銷號數據放到SFTP服務器上,我需要開發定時任務去解析文件。因為是多省份,服務器、文件名規則、數據規則都不一定,所以要做成可配置是有一定難度的。數據規則這塊必須強烈要求統一,服務器、文件名規則都可以從配置中心去讀。每新增一個省份的配置,後臺感知到後,動態生成定時任務。

二、Springboot引入定時任務核心配置

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.
class) @Documented public @interface EnableScheduling { } @Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }

  接下來主要看一下這個核心後置處理器:ScheduledAnnotationBeanPostProcessor 。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
            bean instanceof ScheduledExecutorService) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }

    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass)) {
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                            method, Scheduled.class, Schedules.class);
                    return (!scheduledMethods.isEmpty() ? scheduledMethods : null
); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean ‘" + beanName + "‘: " + annotatedMethods); } } } return bean; }

  1、處理Scheduled註解,通過ScheduledTaskRegistrar註冊定時任務。

private void finishRegistration() {
    if (this.scheduler != null) {
        this.registrar.setScheduler(this.scheduler);
    }

    if (this.beanFactory instanceof ListableBeanFactory) {
        Map<String, SchedulingConfigurer> beans =
                ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
        List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
        AnnotationAwareOrderComparator.sort(configurers);
        for (SchedulingConfigurer configurer : configurers) {
            configurer.configureTasks(this.registrar);
        }
    }

    if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
        try {
            // Search for TaskScheduler bean...
            this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
        }
        catch (NoUniqueBeanDefinitionException ex) {
            logger.trace("Could not find unique TaskScheduler bean", ex);
            try {
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isInfoEnabled()) {
                    logger.info("More than one TaskScheduler bean exists within the context, and " +
                            "none is named ‘taskScheduler‘. Mark one of them as primary or name it ‘taskScheduler‘ " +
                            "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                            "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                            ex.getBeanNamesFound());
                }
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
            logger.trace("Could not find default TaskScheduler bean", ex);
            // Search for ScheduledExecutorService bean next...
            try {
                this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex2) {
                logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
                try {
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                }
                catch (NoSuchBeanDefinitionException ex3) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                "none is named ‘taskScheduler‘. Mark one of them as primary or name it ‘taskScheduler‘ " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex2.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex2) {
                logger.trace("Could not find default ScheduledExecutorService bean", ex2);
                // Giving up -> falling back to default scheduler within the registrar...
                logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
            }
        }
    }

    this.registrar.afterPropertiesSet();
}

  1、通過一系列的SchedulingConfigurer動態配置ScheduledTaskRegistrar。

  2、向ScheduledTaskRegistrar註冊一個TaskScheduler(用於對Runnable的任務進行調度,它包含有多種觸發規則)。

  3、registrar.afterPropertiesSet(),在這開始安排所有的定時任務開始執行了。

protected void scheduleTasks() {
    if (this.taskScheduler == null) {
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    if (this.triggerTasks != null) {
        for (TriggerTask task : this.triggerTasks) {
            addScheduledTask(scheduleTriggerTask(task));
        }
    }
    if (this.cronTasks != null) {
        for (CronTask task : this.cronTasks) {
            addScheduledTask(scheduleCronTask(task));
        }
    }
    if (this.fixedRateTasks != null) {
        for (IntervalTask task : this.fixedRateTasks) {
            addScheduledTask(scheduleFixedRateTask(task));
        }
    }
    if (this.fixedDelayTasks != null) {
        for (IntervalTask task : this.fixedDelayTasks) {
            addScheduledTask(scheduleFixedDelayTask(task));
        }
    }
}

  1、TriggerTask:動態定時任務。通過Trigger#nextExecutionTime 給定的觸發上下文確定下一個執行時間。

  2、CronTask:動態定時任務,TriggerTask子類。通過cron表達式確定的時間觸發下一個任務執行。

  3、IntervalTask:一定時間延遲之後,周期性執行的任務。

  4、taskScheduler 如果為空,默認是ConcurrentTaskScheduler,並使用默認單線程的ScheduledExecutor。

三、主要看一下CronTask工作原理

ScheduledTaskRegistrar.java
@Nullable
public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
        scheduledTask = new ScheduledTask(task);
        newTask = true;
    }
    if (this.taskScheduler != null) {
        scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    }
    else {
        addCronTask(task);
        this.unresolvedTasks.put(task, scheduledTask);
    }
    return (newTask ? scheduledTask : null);
}

ConcurrentTaskScheduler.java
@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
    try {
        if (this.enterpriseConcurrentScheduler) {
            return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
        }
        else {
            ErrorHandler errorHandler =
                    (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
            return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
        }
    }
    catch (RejectedExecutionException ex) {
        throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
    }
}

ReschedulingRunnable.java
@Nullable
public ScheduledFuture<?> schedule() {
    synchronized (this.triggerContextMonitor) {
        this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
        if (this.scheduledExecutionTime == null) {
            return null;
        }
        long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
        this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
        return this;
    }
}

private ScheduledFuture<?> obtainCurrentFuture() {
    Assert.state(this.currentFuture != null, "No scheduled future");
    return this.currentFuture;
}

@Override
public void run() {
    Date actualExecutionTime = new Date();
    super.run();
    Date completionTime = new Date();
    synchronized (this.triggerContextMonitor) {
        Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
        this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
        if (!obtainCurrentFuture().isCancelled()) {
            schedule();
        }
    }
}

  1、最終將task和trigger都封裝到了ReschedulingRunnable中。

  2、ReschedulingRunnable實現了任務重復調度(schedule方法中調用調度器executor並傳入自身對象,executor會調用run方法,run方法又調用了schedule方法)。

  3、ReschedulingRunnable schedule方法加了同步鎖,只能有一個線程拿到下次執行時間並加入執行器的調度。

  4、不同的ReschedulingRunnable對象之間在線程池夠用的情況下是不會相互影響的,也就是說滿足線程池的條件下,TaskScheduler的schedule方法的多次調用是可以交叉執行的。

ScheduledThreadPoolExecutor.java
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}


private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

  ScheduledFutureTask 工作原理如下圖所示【太懶了,不想畫圖了,盜圖一張】。

   技術分享圖片

  1、ScheduledFutureTask會放入優先阻塞隊列:ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆實現)

  2、上圖中的Thread對象即ThreadPoolExecutor.Worker,實現了Runnable接口

/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

  1、Worker中維護了Thread對象,Thread對象的Runnable實例即Worker自身

  2、ThreadPoolExecutor#addWorker方法中會創建Worker對象,然後拿到Worker中的thread實例並start,這樣就創建了線程池中的一個線程實例

  3、Worker的run方法會調用ThreadPoolExecutor#runWorker方法,這才是任務最終被執行的地方,該方法示意如下

  (1)首先取傳入的task執行,如果task是null,只要該線程池處於運行狀態,就會通過getTask方法從workQueue中取任務。ThreadPoolExecutor的execute方法會在無法產生core線程的時候向  workQueue隊列中offer任務。
getTask方法從隊列中取task的時候會根據相關配置決定是否阻塞和阻塞多久。如果getTask方法結束,返回的是null,runWorker循環結束,執行processWorkerExit方法。
至此,該線程結束自己的使命,從線程池中“消失”。
  (2)在開始執行任務之前,會調用Worker的lock方法,目的是阻止task正在被執行的時候被interrupt,通過調用clearInterruptsForTaskRun方法來保證的(後面可以看一下這個方法),該線程沒有自己的interrupt set了。
  (3)beforeExecute和afterExecute方法用於在執行任務前後執行一些自定義的操作,這兩個方法是空的,留給繼承類去填充功能。
我們可以在beforeExecute方法中拋出異常,這樣task不會被執行,而且在跳出該循環的時候completedAbruptly的值是true,表示the worker died due to user exception,會用decrementWorkerCount調整wc。
  (4)因為Runnable的run方法不能拋出Throwables異常,所以這裏重新包裝異常然後拋出,拋出的異常會使當當前線程死掉,可以在afterExecute中對異常做一些處理。
  (5)afterExecute方法也可能拋出異常,也可能使當前線程死掉。

四、動態創建定時任務

  TaskConfiguration 配置類

@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {

    @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledExecutorService scheduledAnnotationProcessor() {
        return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
    }

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-schedule-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

  1、保證ConcurrentTaskScheduler不使用默認單線程的ScheduledExecutor,而是corePoolSize=5的線程池

  2、自定義線程池工廠類

  DynamicTask 動態定時任務

@Configuration
public class DynamicTask implements SchedulingConfigurer {
    private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class);

    private static final ExecutorService es = new ThreadPoolExecutor(10, 20,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(10),
            new DynamicTaskConsumeThreadFactory());


    private volatile ScheduledTaskRegistrar registrar;
    private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();

    private volatile List<TaskConstant> taskConstants = Lists.newArrayList();

    @Override
    public void configureTasks(ScheduledTaskRegistrar registrar) {
        this.registrar = registrar;
        this.registrar.addTriggerTask(() -> {
                    if (!CollectionUtils.isEmpty(taskConstants)) {
                        LOGGER.info("檢測動態定時任務列表...");
                        List<TimingTask> tts = new ArrayList<>();
                        taskConstants
                                .forEach(taskConstant -> {
                                    TimingTask tt = new TimingTask();
                                    tt.setExpression(taskConstant.getCron());
                                    tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());
                                    tts.add(tt);
                                });
                        this.refreshTasks(tts);
                    }
                }
                , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
    }


    public List<TaskConstant> getTaskConstants() {
        return taskConstants;
    }

    private void refreshTasks(List<TimingTask> tasks) {
        //取消已經刪除的策略任務
        Set<String> taskIds = scheduledFutures.keySet();
        for (String taskId : taskIds) {
            if (!exists(tasks, taskId)) {
                scheduledFutures.get(taskId).cancel(false);
            }
        }
        for (TimingTask tt : tasks) {
            String expression = tt.getExpression();
            if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
                LOGGER.error("定時任務DynamicTask cron表達式不合法: " + expression);
                continue;
            }
            //如果配置一致,則不需要重新創建定時任務
            if (scheduledFutures.containsKey(tt.getTaskId())
                    && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {
                continue;
            }
            //如果策略執行時間發生了變化,則取消當前策略的任務
            if (scheduledFutures.containsKey(tt.getTaskId())) {
                scheduledFutures.remove(tt.getTaskId()).cancel(false);
                cronTasks.remove(tt.getTaskId());
            }
            CronTask task = new CronTask(tt, expression);
            ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
            cronTasks.put(tt.getTaskId(), task);
            scheduledFutures.put(tt.getTaskId(), future);
        }
    }

    private boolean exists(List<TimingTask> tasks, String taskId) {
        for (TimingTask task : tasks) {
            if (task.getTaskId().equals(taskId)) {
                return true;
            }
        }
        return false;
    }

    @PreDestroy
    public void destroy() {
        this.registrar.destroy();
    }

    public static class TaskConstant {
        private String cron;
        private String taskId;

        public String getCron() {
            return cron;
        }

        public void setCron(String cron) {
            this.cron = cron;
        }

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }
    }

    private class TimingTask implements Runnable {
        private String expression;

        private String taskId;

        public String getTaskId() {
            return taskId;
        }

        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {
            //設置隊列大小10
            LOGGER.error("當前CronTask: " + this);
            DynamicBlockingQueue queue = new DynamicBlockingQueue(3);
            es.submit(() -> {
                while (!queue.isDone() || !queue.isEmpty()) {
                    try {
                        String content = queue.poll(500, TimeUnit.MILLISECONDS);
                        if (StringUtils.isBlank(content)) {
                            return;
                        }
                        LOGGER.info("DynamicBlockingQueue 消費:" + content);
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

            //隊列放入數據
            for (int i = 0; i < 5; ++i) {
                try {
                    queue.put(String.valueOf(i));
                    LOGGER.info("DynamicBlockingQueue 生產:" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.setDone(true);
        }

        public String getExpression() {
            return expression;
        }

        public void setExpression(String expression) {
            this.expression = expression;
        }

        @Override
        public String toString() {
            return ReflectionToStringBuilder.toString(this
                    , ToStringStyle.JSON_STYLE
                    , false
                    , false
                    , TimingTask.class);
        }

    }

    /**
     * 隊列消費線程工廠類
     */
    private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DynamicTaskConsumeThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-dynamic-task-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> {
        DynamicBlockingQueue(int capacity) {
            super(capacity);
        }


        private volatile boolean done = false;

        public boolean isDone() {
            return done;
        }

        public void setDone(boolean done) {
            this.done = done;
        }
    }
}

  1、taskConstants 動態任務列表

  2、ScheduledTaskRegistrar#addTriggerTask 添加動態周期定時任務,檢測動態任務列表的變化

CronTask task = new CronTask(tt, expression);
ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(tt.getTaskId(), task);
scheduledFutures.put(tt.getTaskId(), future);

  3、動態創建cron定時任務,拿到ScheduledFuture實例並緩存起來

  4、在刷新任務列表時,通過緩存的ScheduledFuture實例和CronTask實例,來決定是否取消、移除失效的動態定時任務。

  DynamicTaskTest 動態定時任務測試類

@RunWith(SpringRunner.class)
@SpringBootTest
public class DynamicTaskTest {

    @Autowired
    private DynamicTask dynamicTask;

    @Test
    public void test() throws InterruptedException {
        List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
        DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
        taskConstant.setCron("0/5 * * * * ?");
        taskConstant.setTaskId("test1");
        taskConstans.add(taskConstant);


        DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
        taskConstant1.setCron("0/5 * * * * ?");
        taskConstant1.setTaskId("test2");
        taskConstans.add(taskConstant1);

        DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
        taskConstant2.setCron("0/5 * * * * ?");
        taskConstant2.setTaskId("test3");
        taskConstans.add(taskConstant2);

        TimeUnit.SECONDS.sleep(40);
        //移除並添加新的配置
        taskConstans.remove(taskConstans.size() - 1);
        DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
        taskConstant3.setCron("0/5 * * * * ?");
        taskConstant3.setTaskId("test4");
        taskConstans.add(taskConstant3);
//
        TimeUnit.MINUTES.sleep(50);
    }
}

Springboot定時任務原理及如何動態創建定時任務