从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调用该方法。
调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行。
Spring内部线程池,其实是SimpleAsyncTaskExecutor,它不会复用线程的,设计初衷就是执行大量的短时间的任务。
指在@Async注解在使用时,不指定线程池的名称。查看源码,@Async的默认线程池为SimpleAsyncTaskExecutor。
默认线程池的弊端
在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。Executors各个方法的弊端:
newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
newCachedThreadPool和newScheduledThreadPool:要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。
针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能。
@Async应用自定义线程池
自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。
自定义线程池有如下模式:
重新实现接口AsyncConfigurer
继承AsyncConfigurerSupport
配置由自定义的TaskExecutor替代内置的任务执行器
我们先创建一个异步执行的服务类:
package com.example.springboot.task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service public class SyncService { Logger J101 = LoggerFactory.getLogger("J101"); @Async("javacui_task") public void run(int i){ J101.info("任务开始执行 -->" + i); try { Thread.sleep(5 * 1000); }catch (Exception e){ } J101.info("任务执行完毕 -->" + i); } }
写一个测试方法,调用数次这个异步方法来实验效果
package com.example.springboot; import com.example.springboot.task.SyncService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class SyncTest { @Autowired private SyncService syncService; @Test public void test1() { for (int i = 1; i <= 5; i++) { syncService.run(i); } // 保证主线程不退出 try { while (true){ Thread.sleep(5 * 1000); } }catch (Exception e){ } } }
(1)通过查看Spring源码关于@Async的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。且重新实现 public Executor getAsyncExecutor()方法。
package com.example.springboot.config; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class AsyncConfiguration implements AsyncConfigurer { public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(1); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } public Executor getAsyncExecutor() { return executor(); } }
(2)继承AsyncConfigurerSupport
package com.example.springboot.config; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurerSupport; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class SpringAsyncConfigurer extends AsyncConfigurerSupport { public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(1); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } public Executor getAsyncExecutor() { return asyncExecutor(); } }
(4)配置自定义的TaskExecutor
由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查询是否存在默认名称为TaskExecutor的线程池。
所以可在项目中,定义名称为TaskExecutor的bean生成一个默认线程池。也可不指定线程池的名称,申明一个线程池,本身底层是基于TaskExecutor.class便可。
package com.example.springboot.config; import org.springframework.aop.interceptor.AsyncExecutionAspectSupport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class TaskPoolConfig { @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME) public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } @Bean(name = "javacui_task") public Executor taskExecutorJavaCui() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(1); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
(5)@Async注解,使用系统默认或者自定义的线程池(代替默认线程池)。
可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("javacui_task")。
package com.example.springboot.task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service public class SyncService { Logger J101 = LoggerFactory.getLogger("J101"); @Async("javacui_task") public void run(int i){ J101.info("任务开始执行 -->" + i); try { Thread.sleep(5 * 1000); }catch (Exception e){ } J101.info("任务执行完毕 -->" + i); } }
(6)上述代码中,核心线程数量,最大线程数量,和队列都是1,因此,使用任何一个线程池配置时,执行都是按顺序执行的,修改MaxPoolSize看效果。
备线程池参数:corePoolSize
Set the ThreadPoolExecutor's core pool size.Default is 1.
线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。
这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。
再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。
maxPoolSize
Set the ThreadPoolExecutor's maximum pool size.
Default is Integer.MAX_VALUE
线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列queueCapacity任务已满,并且当前线程个数小于maxPoolSize,那么会创建新的线程来执行任务。
queueCapacity
Set the capacity for the ThreadPoolExecutor's BlockingQueue.
Default is Integer.MAX_VALUE
任务队列容量(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
rejectedExecutionHandler
Set the RejectedExecutionHandler to use for the ExecutorService.
Default is the ExecutorService's default abort policy.
ThreadPoolExecutor类有几个内部实现类来处理拒绝任务:
1.AbortPolicy 丢弃任务,抛运行时异常
2.CallerRunsPolicy 执行任务
3.DiscardPolicy 忽视,什么都不会发生
4.DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
5.实现RejectedExecutionHandler接口,可自定义处理器
END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: