初探Spring Scheduler

  项目使用很多@Scheduled(cron=**)注解来实现定时任务,部署到现场跑了一段时间,
发现某些任务未按照cron表达式定义的频率执行。

怀疑

  初步怀疑未按频率执行的原因可能是频率设置不合理,任务执行的时长较长导致,优化了部分程序执行时间后,有所改善。后来又发现,所有的任务都不执行了,经排查发现某个定时任务阻塞在Socket的读操作上。
  这下就怀疑是不是Spring的定时任务有问题?我们臆测Spring会开多个线程执行定时任务,但实际情况是不是这样呢?

源码

  带着这些疑问,我一步一步走了遍Spring 3.1.0.RELEASE的源码:

  • Listener,配置在web.xml,容器启动时调用

    1
    org.springframework.web.context.ContextLoaderListener.contextInitialized(ServletContextEvent)
  • 实例化WebApplicationContext

    1
    org.springframework.web.context.ContextLoader.initWebApplicationContext(ServletContext)
  • 新建WebApplicationContext,如果没有指定,默认新建XmlWebApplicationContext

    1
    2
    3
    org.springframework.web.context.ContextLoader.createWebApplicationContext(ServletContext)

    org.springframework.web.context.support.XmlWebApplicationContext
  • XmlWebApplicationContext是ConfigurableWebApplicationContext,配置并刷新WebApplicationContext,最后调用ConfigurableApplicationContext的refresh方法。

    1
    2
    3
    org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ConfigurableWebApplicationContext, ServletContext)

    org.springframework.context.ConfigurableApplicationContext.refresh()
  • 抽象类org.springframework.context.support.AbstractApplicationContext实现了refresh这一重要逻辑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    public void refresh() throws BeansException, IllegalStateException {
    synchronized (this.startupShutdownMonitor) {
    // Prepare this context for refreshing.
    prepareRefresh();

    // Tell the subclass to refresh the internal bean factory.
    ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

    // Prepare the bean factory for use in this context.
    prepareBeanFactory(beanFactory);

    try {
    // Allows post-processing of the bean factory in context subclasses.
    postProcessBeanFactory(beanFactory);

    // Invoke factory processors registered as beans in the context.
    invokeBeanFactoryPostProcessors(beanFactory);

    // Register bean processors that intercept bean creation.
    registerBeanPostProcessors(beanFactory);

    // Initialize message source for this context.
    initMessageSource();

    // Initialize event multicaster for this context.
    initApplicationEventMulticaster();

    // Initialize other special beans in specific context subclasses.
    onRefresh();

    // Check for listener beans and register them.
    registerListeners();

    // Instantiate all remaining (non-lazy-init) singletons.
    finishBeanFactoryInitialization(beanFactory);

    // Last step: publish corresponding event.
    finishRefresh();
    }

    catch (BeansException ex) {
    // Destroy already created singletons to avoid dangling resources.
    destroyBeans();

    // Reset 'active' flag.
    cancelRefresh(ex);

    // Propagate exception to caller.
    throw ex;
    }
    }
    }
  • org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor是@Scheduled注解处理类,实现org.springframework.beans.factory.config.BeanPostProcessor接口(postProcessAfterInitialization方法实现注解扫描和类实例创建)、org.springframework.context.ApplicationContextAware接口(setApplicationContext方法设置当前ApplicationContext)、org.springframework.context.ApplicationListener(观察者模式,onApplicationEvent方法会被回调)。

  • ScheduledAnnotationBeanPostProcessor postProcessAfterInitialization扫描所有@Scheduled注解,区分cronTasks、fixedDelayTasks、fixedRateTasks

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    public Object postProcessAfterInitialization(final Object bean, String beanName) {
    final Class<?> targetClass = AopUtils.getTargetClass(bean);
    ReflectionUtils.doWithMethods(targetClass, new MethodCallback() {
    public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
    Scheduled annotation = AnnotationUtils.getAnnotation(method, Scheduled.class);
    if (annotation != null) {
    Assert.isTrue(void.class.equals(method.getReturnType()),
    "Only void-returning methods may be annotated with @Scheduled.");
    Assert.isTrue(method.getParameterTypes().length == 0,
    "Only no-arg methods may be annotated with @Scheduled.");
    if (AopUtils.isJdkDynamicProxy(bean)) {
    try {
    // found a @Scheduled method on the target class for this JDK proxy -> is it
    // also present on the proxy itself?
    method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
    } catch (SecurityException ex) {
    ReflectionUtils.handleReflectionException(ex);
    } catch (NoSuchMethodException ex) {
    throw new IllegalStateException(String.format(
    "@Scheduled method '%s' found on bean target class '%s', " +
    "but not found in any interface(s) for bean JDK proxy. Either " +
    "pull the method up to an interface or switch to subclass (CGLIB) " +
    "proxies by setting proxy-target-class/proxyTargetClass " +
    "attribute to 'true'", method.getName(), targetClass.getSimpleName()));
    }
    }
    Runnable runnable = new ScheduledMethodRunnable(bean, method);
    boolean processedSchedule = false;
    String errorMessage = "Exactly one of 'cron', 'fixedDelay', or 'fixedRate' is required.";
    String cron = annotation.cron();
    if (!"".equals(cron)) {
    processedSchedule = true;
    if (embeddedValueResolver != null) {
    cron = embeddedValueResolver.resolveStringValue(cron);
    }
    cronTasks.put(runnable, cron);
    }
    long fixedDelay = annotation.fixedDelay();
    if (fixedDelay >= 0) {
    Assert.isTrue(!processedSchedule, errorMessage);
    processedSchedule = true;
    fixedDelayTasks.put(runnable, fixedDelay);
    }
    long fixedRate = annotation.fixedRate();
    if (fixedRate >= 0) {
    Assert.isTrue(!processedSchedule, errorMessage);
    processedSchedule = true;
    fixedRateTasks.put(runnable, fixedRate);
    }
    Assert.isTrue(processedSchedule, errorMessage);
    }
    }
    });
    return bean;
    }
  • finishRefresh方法触发所有监视者方法回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public void onApplicationEvent(ContextRefreshedEvent event) {
    if (event.getApplicationContext() != this.applicationContext) {
    return;
    }

    Map<String, SchedulingConfigurer> configurers = applicationContext.getBeansOfType(SchedulingConfigurer.class);

    if (this.cronTasks.isEmpty() && this.fixedDelayTasks.isEmpty() &&
    this.fixedRateTasks.isEmpty() && configurers.isEmpty()) {
    return;
    }

    this.registrar = new ScheduledTaskRegistrar();
    this.registrar.setCronTasks(this.cronTasks);
    this.registrar.setFixedDelayTasks(this.fixedDelayTasks);
    this.registrar.setFixedRateTasks(this.fixedRateTasks);

    if (this.scheduler != null) {
    this.registrar.setScheduler(this.scheduler);
    }

    for (SchedulingConfigurer configurer : configurers.values()) {
    configurer.configureTasks(this.registrar);
    }

    if (registrar.getScheduler() == null) {
    Map<String, ? super Object> schedulers = new HashMap<String, Object>();
    schedulers.putAll(applicationContext.getBeansOfType(TaskScheduler.class));
    schedulers.putAll(applicationContext.getBeansOfType(ScheduledExecutorService.class));
    if (schedulers.size() == 0) {
    // do nothing -> fall back to default scheduler
    } else if (schedulers.size() == 1) {
    this.registrar.setScheduler(schedulers.values().iterator().next());
    } else if (schedulers.size() >= 2){
    throw new IllegalStateException("More than one TaskScheduler and/or ScheduledExecutorService " +
    "exist within the context. Remove all but one of the beans; or implement the " +
    "SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler " +
    "explicitly within the configureTasks() callback. Found the following beans: " + schedulers.keySet());
    }
    }

    this.registrar.afterPropertiesSet();
    }
  • onApplicationEvent方法最后调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public void afterPropertiesSet() {
    //关键是这里,项目如果只使用@Scheduled注解,未配置TaskScheduler,Spring只会实例化一个线程的线程池
    if (this.taskScheduler == null) {
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    if (this.triggerTasks != null) {
    for (Map.Entry<Runnable, Trigger> entry : this.triggerTasks.entrySet()) {
    this.scheduledFutures.add(this.taskScheduler.schedule(entry.getKey(), entry.getValue()));
    }
    }
    if (this.cronTasks != null) {
    for (Map.Entry<Runnable, String> entry : this.cronTasks.entrySet()) {
    this.scheduledFutures.add(this.taskScheduler.schedule(entry.getKey(), new CronTrigger(entry.getValue())));
    }
    }
    if (this.fixedRateTasks != null) {
    for (Map.Entry<Runnable, Long> entry : this.fixedRateTasks.entrySet()) {
    this.scheduledFutures.add(this.taskScheduler.scheduleAtFixedRate(entry.getKey(), entry.getValue()));
    }
    }
    if (this.fixedDelayTasks != null) {
    for (Map.Entry<Runnable, Long> entry : this.fixedDelayTasks.entrySet()) {
    this.scheduledFutures.add(this.taskScheduler.scheduleWithFixedDelay(entry.getKey(), entry.getValue()));
    }
    }
    }
  • 从关键点代码可以看出,Spring实例化只有一个线程的定时任务线程池
    1
    2
    3
    4
    if (this.taskScheduler == null) {
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }

结论

  1. 全采用@Scheduled(cron=**)注解,不单独配置TaskScheduler的话,Spring只会开一个线程执行所有定时任务。
  2. 如果配置多个TaskScheduler,没有给@Scheduled(cron=**)注解指定某个TaskScheduler,运行会报错。