springboot的@Async注解的默认线程池

前言

相信刚刚使用springboot不久的后端开发经常会听到或者看到类似的言论:在使用@Async的时候,需要指定线程池,如果不指定的话,会使用SimpleAsyncTaskExecutor线程池。
该线程池有如下特点:

  • 为每个任务启动一个新线程,异步执行它。
  • 支持通过"concurrencyLimit" bean 属性限制并发线程。默认情况下,并发线程数是无限的。
  • 此实现不重用线程!

看看SimpleAsyncTaskExecutor的代码就知道:

@Deprecated
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
        if (this.isThrottleActive() && startTimeout > 0L) {
            this.concurrencyThrottle.beforeAccess();
            this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
        } else {
            this.doExecute(taskToUse);
        }

    }

protected void doExecute(Runnable task) {
        Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);
        thread.start();
    }

每次执行任务的时候都默认创建线程池。

听着就听唬人的是吧,以至于我每次写@Async注解的时候都会下意识的看下线程池。

但是我今天在看线程池的时候,突然想到了SimpleAsyncTaskExecutor这个线程池,于是本着我的印象,我写了如下代码:

@Service
@Slf4j
public class ThreadPoolServiceImpl implements ThreadPoolService {

    @Override
    @Async
    public void hello() {
        log.info("当前线程:{}", Thread.currentThread().getName());
    }
}

多次执行

@SpringBootApplication
@EnableAsync
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    CommandLineRunner lookupTestService(ThreadPoolService testService) {
        return args -> {

            // 1、test接口
            for(int i = 0; i < 10; i++) {
                testService.hello();
            }

        };
    }
}

结果如下:
在这里插入图片描述
嘿?说好的默认使用SimpleAsyncTaskExecutor,线程不重用呢?

去看看源码:
@Async 的拦截器类org.springframework.aop.interceptor.AsyncExecutionInterceptor

@Nullable
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
        AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
        } else {
            Callable<Object> task = () -> {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future)result).get();
                    }
                } catch (ExecutionException var4) {
                    this.handleError(var4.getCause(), userDeclaredMethod, invocation.getArguments());
                } catch (Throwable var5) {
                    this.handleError(var5, userDeclaredMethod, invocation.getArguments());
                }

                return null;
            };
            return this.doSubmit(task, executor, invocation.getMethod().getReturnType());
        }
    }


	@Nullable
	 // AsyncExecutionInterceptor 父类中的方法,可以得到异步方法对应的执行器
    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
        if (executor == null) {
            String qualifier = this.getExecutorQualifier(method);
            Executor targetExecutor;
            if (StringUtils.hasLength(qualifier)) {
                targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
            } else {
            	//通过观察源码,可以发现 defaultExecutor 是通过 getDefaultExecutor 得到的
                targetExecutor = (Executor)this.defaultExecutor.get();
            }

            if (targetExecutor == null) {
                return null;
            }

            executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
            this.executors.put(method, executor);
        }

        return (AsyncTaskExecutor)executor;
    }

@Nullable
    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
        Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
        return (Executor)(defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }


@Nullable
	// 父类中的 getDefaultExecutor 
    // 会去 Spring 的容器中找有没有 TaskExecutor 或名称为 'taskExecutor' 为 Executor 的 
    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
        if (beanFactory != null) {
            try {
                return (Executor)beanFactory.getBean(TaskExecutor.class);
            } catch (NoUniqueBeanDefinitionException var6) {
                this.logger.debug("Could not find unique TaskExecutor bean. Continuing search for an Executor bean named 'taskExecutor'", var6);

                try {
                    return (Executor)beanFactory.getBean("taskExecutor", Executor.class);
                } catch (NoSuchBeanDefinitionException var4) {
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info("More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: " + var6.getBeanNamesFound());
                    }
                }
            } catch (NoSuchBeanDefinitionException var7) {
                this.logger.debug("Could not find default TaskExecutor bean. Continuing search for an Executor bean named 'taskExecutor'", var7);

                try {
                    return (Executor)beanFactory.getBean("taskExecutor", Executor.class);
                } catch (NoSuchBeanDefinitionException var5) {
                    this.logger.info("No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either");
                }
            }
        }

        return null;
    }

简单总结一下,在 SpringFramework 中,如果没有自定义的 TaskExecutor 或名称为 ‘taskExecutor’ 的 Bean 对象,那么就会使用 SimpleAsyncTaskExecutor 。所以在当前版本 SpringFramework 中 @Async 默认是用的是 SimpleAsyncTaskExecutor。

问题又来了,为什么容器中会有 ThreadPoolTaskExecutor 的 Bean 对象

翻阅了springboot的源码,spring-boot-autoconfigure 这个包中的TaskExecutionAutoConfiguration类中,看到了如下代码:

@ConditionalOnClass({ThreadPoolTaskExecutor.class})
@AutoConfiguration
@EnableConfigurationProperties({TaskExecutionProperties.class})
public class TaskExecutionAutoConfiguration {
    public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

    public TaskExecutionAutoConfiguration() {
    }

    @Bean
    @ConditionalOnMissingBean
    public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) {
        Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
        Stream var10001 = taskExecutorCustomizers.orderedStream();
        var10001.getClass();
        builder = builder.customizers(var10001::iterator);
        builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique());
        return builder;
    }

    @Lazy
    @Bean(
        name = {"applicationTaskExecutor", "taskExecutor"}
    )
    @ConditionalOnMissingBean({Executor.class})
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }
}

在applicationTaskExecutor方法中进入到了TaskExecutorBuilder类,看到了
发现默认使用的并不是SimpleAsyncTaskExecutor而是ThreadPoolTaskExecutor。

public ThreadPoolTaskExecutor build() {
        return this.configure(new ThreadPoolTaskExecutor());
    }

由于 spring-boot-autoconfigure 是 SpringBoot 一个重要的依赖,所以只要是 SpringBoot 项目就一定会依赖它,可以断定 ThreadPoolTaskExecutor 是 SpringBoot 项目中 Executor 的默认 Bean 对象。而 @Async 在选择执行器的时候会先去 IOC 容器中先找是否有 TaskExecutor 的 Bean对象,所以在当前版本 SpringBoot 中,@Async 的默认 TaskExecutor 是 ThreadPoolTaskExecutor。

最后查询了各种资料才找到了原因,在spring boot2.1.0.RELEASE版本的时候,新增了TaskExecutionAutoConfiguration配置类,
在这里插入图片描述

也就是说新版本,其实spring boot默认使用的已经是ThreadPoolTaskExecutor线程池了,大家不用再去手动更改默认的线程池,不过还是可以在配置文件更改ThreadPoolTaskExecutor的参数。

总结
在 SpringBoot 2.0.9 版本及以前,@Async 默认使用的是 SimpleAsyncTaskExecutor;从 2.1.0 开始,@Async 默认使用的是 ThreadPoolTaskExecutor。这也就是大家的结论里面spring线程池默认不是ThreadPoolTaskExecutor的原因。

那怎么去使用SimpleAsyncTaskExecutor线程池呢?
比如使用Executors.newCachedThreadPool() 创建线程池,

@Component
public class ExecutorConfig {
    @Bean(name = "executorService")
    public ExecutorService asyncServiceExecutor() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        System.out.println(executorService instanceof Executor);
        return executorService;
    }
}

这样就不会默认生成TaskExecutor线程池
在这里插入图片描述