优秀的编程知识分享平台

网站首页 > 技术文章 正文

springboot定时任务防止多实例重复执行架构方案的实现

nanyue 2024-12-12 14:12:07 技术文章 9 ℃

1. 概述

在上一篇中介绍了springboot内置的定时任务管理机制。同时也提到了解决多个节点同时执行防止重复执行的思路。现在提供一个开箱即用的框架来完成此功能。

要点目标

  • 基于注解,避免重复写相关逻辑代码
  • 具有超时设定功能,防止阻滞任务一直占有资源

2. 框架实现

启用定时任务配置类

// 省略包名和引入

/**
 *启用 spring boot 自带的计划任务
 */
@Configuration
@EnableScheduling
public class SchedulingConfig {
    // 上篇文章中的方法在新的实验测试中不好使了,可以删除掉
    // 当然也可以坚持实现SchedulingConfigurer接口方法,
    // 关键点是ConcurrentTaskScheduler的使用
    // 但基本也没什么用,所以这个类就直接当作开启定时任务类的配置类,空着就行了
}

防止重复执行锁 注解的定义

// 省略包名和引入

/**
 * 组织重复执行锁(同时带超时时间设定,默认5分钟)
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface PreventDuplicateLock {

    long timeout() default 5 * 60L;

}

控制防止重复执行的切片处理类

// 省略包名和引入

@Component
@Aspect
public abstract class AbstractSchedulingAspect {

    private Logger logger = LoggerFactory.getLogger(AbstractSchedulingAspect.class);

    /** 暂时默认最多同时500给线程 */
    private static ExecutorService exec;
    static {
        exec = new ThreadPoolExecutor(1, 500,
                60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(500));
    }

    @Value("${spring.application.name:none}")
    private String applicationName;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 必须在应用中实现的虚拟方法,且带@Pointcut注解
     */
    public abstract void doTask();

    @Around("doTask()")
    public Object doAround(ProceedingJoinPoint proceedingJoinPoint) {
        Method method = ((MethodSignature) proceedingJoinPoint.getSignature()).getMethod();
        PreventDuplicateLock lock = method.getDeclaredAnnotation(PreventDuplicateLock.class);

        if (lock != null) {
            long timeout = lock.timeout();
            String methodKey = applicationName + "-PreventDuplicateLock-" + method.getDeclaringClass().getName() + "-" + method.getName();

            try {
                logger.info("判断程序是否已被其它程序执行");
                
                Boolean locked = stringRedisTemplate.opsForValue().setIfAbsent(methodKey, "lock",
                        timeout, TimeUnit.SECONDS);
                logger.info("{} 唯一锁超时设定为:{} 秒", methodKey, timeout);
                
                if (locked) {
                    
                    Future<Object> future = exec.submit(() -> {
                        // 执行原方法
                        logger.info("执行任务:{}", methodKey);
                        Object obj = null;
                        try {
                            obj = proceedingJoinPoint.proceed(proceedingJoinPoint.getArgs());
                        } catch (Throwable throwable) {
                            throw new RuntimeException(throwable);
                        }
                        return obj;
                    });

                    // 超时处理
                    Object obj = future.get(timeout, TimeUnit.SECONDS);
                    return obj;
                } else {
                    logger.info("任务已经被其它线程实例执行,本实例跳过执行:{}", methodKey);
                    return null;
                }

            } catch (TimeoutException ex) {
                logger.error("{}运行超时", methodKey, ex);
                return null;
            } catch (Throwable throwable) {
                throw new RuntimeException(throwable);
            } finally {
                stringRedisTemplate.delete(methodKey);
            }
        }
        
        else {
            // origin logic path
            try {
                Object obj = proceedingJoinPoint.proceed(proceedingJoinPoint.getArgs());
                return obj;
            } catch (Throwable throwable) {
                throw new RuntimeException(throwable);
            }
        }
    }

}

3. 定时任务示例

首先配置切入点

// 省略包名和引入

/**
 * 配置定时任务切入点
 * 必须的,否则防止多实例部署重复执行功能不起作用
 */
@Component
@Aspect
public class DemoSchedulingAspectConfig extends AbstractSchedulingAspect {

    @Override
    @Pointcut("execution( * your.task.package..*.*(..))")
    public void doTask() {
    }
}

接着是具体的示例类

// 省略包名和引入

/**
 * 异步执行任务,能真正实现按频率执行 关键在@EnableAsync注解的使用
 */
@EnableAsync
@Component
public class DemoAsyncTasks {

    /**
     * 注意 @PreventDuplicateLock 注解的使用是本问的关键
     */
    @PreventDuplicateLock(timeout = 300)
    @Async
    @Scheduled(cron = "*/5 * * * * ?")
    public void scheduleFixedRateTaskAsync() throws InterruptedException {
        System.out.println(
                "固定频率异步执行的任务 - " + System.currentTimeMillis() / 1000);
        Thread.sleep(5000);
    }

}

4. 更多设想和可实现

  • 比如可以实现异常或者失败重试功能。
  • 支持数据库和redis作为同步锁服务端二选一的可插播配置。

其实还有很多功能可以丰富,而且在框架层就可以实现从而减少业务开发者的麻烦和重复工作,在这里仅简单提示

Tags:

最近发表
标签列表