详解SpringBoot定时任务功能

网友投稿 881 2022-10-05

详解SpringBoot定时任务功能

详解SpringBoot定时任务功能

目录一 背景二 动态定时任务调度三 多节点任务执行问题四 后记

一 背景

项目中需要一个可以动态新增定时定时任务的功能,现在项目中使用的是xxl-job定时任务调度系统,但是经过一番对xxl-job功能的了解,发现xxl-job对项目动态新增定时任务,动态删除定时任务的支持并不是那么好,所以需要自己手动实现一个定时任务的功能

二 动态定时任务调度

1 技术选择

Timer or ScheduledExecutorService

这两个都能实现定时任务调度,先看下Timer的定时任务调度

public class MyTimerTask extends TimerTask {

private String name;

public MyTimerTask(String name){

this.name = name;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

@Override

public void run() {

//task

Calendar instance = Calendar.getInstance();

System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));

}

}

Timer timer = new Timer();

MyTimerTask timerTask = new MyTimerTask("NO.1");

//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次

timer.schedule(timerTask,1000L,2000L);

在看下ScheduledThreadPoolExecutor的实现

//org.apache.commons.lang3.concurrent.BasicThreadFactory

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,

new BasicThreadFactory.Builder().namingPattern("examphttp://le-schedule-pool-%d").daemon(true).build());

executorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

//do something

}

},initialDelay,period, TimeUnit.HOURS);

两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

从建议上来看,是一定要选择ScheduledExecutorService了,我们看看源码看看为什么Timer出现问题会终止执行

/**

* The timer thread.

*/

private final TimerThread thread = new TimerThread(queue);

public Timer() {

this("Timer-" + serialNumber());

}

public Timer(String name) {

thread.setName(name);

thread.start();

}

新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看

class TimerThread extends Thread {

boolean newTasksMayBeScheduleVaZZAXUKqnd = true;

/**

* 每一件一个任务都是一个quene

*/

private TaskQueue queue;

TimerThread(TaskQueue queue) {

this.queue = queue;

}

public void run() {

try {

mainLoop();

} finally {

// Someone killed this Thread, behave as if Timer cancelled

synchronized(queue) {

newTasksMayBeScheduled = false;

queue.clear(); // 清除所有任务信息

}

}

}

/**

* The main timer loop. (See class comment.)

*/

private void mainLoop() {

while (true) {

try {

TimerTask task;

boolean taskFired;

synchronized(queue) {

// Wait for queue to become non-empty

while (queue.isEmpty() && newTasksMayBeScheduled)

queue.wait();

if (queue.isEmpty())

break; // Queue is empty and will forever remain; die

// Queue nonempty; look at first evt and do the right thing

long currentTime, executionTime;

task = queue.getMin();

synchronized(task.lock) {

if (task.state == TimerTask.CANCELLED) {

queue.removeMin();

continue; // No action required, poll queue again

}

currentTime = System.currentTimeMillis();

executionTime = task.nextExecutionTime;

if (taskFired = (executionTime<=currentTime)) {

if (task.period == 0) { // Non-repeating, remove

queue.removeMin();

task.state = TimerTask.EXECUTED;

} else { // Repeating task, reschedule

queue.rescheduleMin(

task.period<0 ? currentTime - task.period

: executionTime + task.period);

}

}

}

if (!taskFired) // Task hasn't yet fired; wait

queue.wait(executionTime - currentTime);

}

if (taskFired) // Task fired; run it, holding no locks

task.run();

} catch(InterruptedException e) {

}

}

}

}

我们看到,执行了 mainLoop(),里面是 while (true)方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。

这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而ScheduledThreadPoolExecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。

2 使用ScheduledThreadPoolExecutor

从上面看,使用ScheduledThreadPoolExecutor还是比较简单的,但是我们要实现的更优雅一些,所以选择 TaskScheduler来实现

@Component

public class CronTaskRegistrar implements DisposableBean {

private final Map scheduledTasks = new ConcurrentHashMap<>(16);

@Autowired

private TaskScheduler taskScheduler;

public TaskScheduler getScheduler() {

return this.taskScheduler;

}

public void addCronTask(Runnable task, String cronExpression) {

addCronTask(new CronTask(task, cronExpression));

}

private void addCronTask(CronTask cronTask) {

if (cronTask != null) {

Runnable task = cronTask.getRunnable();

if (this.scheduledTasks.containsKey(task)) {

removeCronTask(task);

}

this.scheduledTasks.put(task, scheduleCronTask(cronTask));

}

}

public void removeCronTask(Runnable task) {

Set runnables = this.scheduledTasks.keySet();

Iterator it1 = runnables.iterator();

while (it1.hasNext()) {

SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();

Long taskId = schedulingRunnable.getTaskId();

SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;

if (taskId.equals(cancelRunnable.getTaskId())) {

ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);

if (scheduledTask != null){

scheduledTask.cancel();

}

}

}

}

public ScheduledTask scheduleCronTask(CronTask cronTask) {

ScheduledTask scheduledTask = new ScheduledTask();

scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());

return scheduledTask;

}

@Override

public void destroy() throws Exception {

for (ScheduledTask task : this.scheduledTasks.values()) {

task.cancel();

}

this.scheduledTasks.clear();

}

}

TaskScheduler是本次功能实现的核心类,但是他是一个接口

public interface TaskScheduler {

/**

* Schedule the given {@link Runnable}, invoking it whenever the trigger

* indicates a next execution time.

*

Execution will end once the scheduler shuts down or the returned

* {@link ScheduledFuture} gets cancelled.

* @param task the Runnable to execute whenever the trigger fires

* @param trigger an implementation of the {@link Trigger} interface,

* e.g. a {@link org.springframework.scheduling.support.CronTrigger} object

* wrapping a cron expression

* @return a {@link ScheduledFuture} representing pending completion of the task,

* or {@code null} if the given Trigger object never fires (i.e. returns

* {@code null} from {@link Trigger#nextExecutionTime})

* @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted

* for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)

* @see org.springframework.scheduling.support.CronTrigger

*/

@Nullable

ScheduledFuture> schedule(Runnable task, Trigger trigger);

前面的代码可以看到,我们在类中注入了这个类,但是他是接口,我们怎么知道是那个实现类呢,以往出现这种情况要在类上面加@Primany或者@Quality来执行实现的类,但是我们看到我的注入上并没有标记,因为是通过另一种方式实现的

@Configuration

public class SchedulingConfig {

@Bean

public TaskScheduler taskScheduler() {

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();

// 定时任务执行线程池核心线程数

taskScheduler.setPoolSize(4);

taskScheduler.setRemoveOnCancelPolicy(true);

taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");

return taskScheduler;

}

}

在spring初始化时就注册了Bean TaskScheduler,而我们可以看到他的实现是ThreadPoolTaskScheduler,在网上的资料中有人说ThreadPoolTaskScheduler是TaskScheduler的默认实现类,其实不是,还是需要我们去指定,而这种方式,当我们想替换实现时,只需要修改配置类就行了,很灵活。

而为什么说他是更优雅的实现方式呢,因为他的核心也是通过ScheduledThreadPoolExecutor来实现的

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {

Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");

return this.scheduledExecutor;

}

三 多节点任务执行问题

这次的实现过程中,我并没有选择xxl-job来进行实现,而是采用了TaskScheduler来实现,这也产生了一个问题,xxl-job是分布式的程序调度系统,当想要执行定时任务的应用使用xxl-job时,无论应用程序中部署多少个节点,xxl-job只会选择其中一个节点作为定时任务执行的节点,从而不会产生定时任务在不同节点上同时执行,导致重复执行问题,而使用TaskScheduler来实现,就要考虑多节点重复执行问题。当然既然有问题,就有解决方案

· 方案一 将定时任务功能拆出来单独部署,且只部署一个节点 · 方案二 使用redis setNx的形式,保证同一时间只有一个任务在执行

我选择的是方案二来执行,当然还有一些方式也能保证不重复执行,这里就不多说了,一下是我的实现

public void executeTask(Long taskId) {

if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {

log.info("已有执行中定时发送短信任务,本次不执行!");

return;

}

四 后记

其实定时任务应该每一个开发都会用到的工具,以前并没有了解其中的实现,这次的功能开发过程中也算是对其内涵的进一步了解,以后遇到定时任务的处理也更清晰,更有效率了。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微信小程序API调用wx.request实现数据请求实例讲解(微信小程序 wx.request)
下一篇:cf516b Drazil and Tiles
相关文章

 发表评论

暂时没有评论,来抢沙发吧~