Java小强个人技术博客站点    手机版
当前位置: 首页 >> 开源 >> Spring异步注解@Async线程池配置

Spring异步注解@Async线程池配置

15110 开源 | 2023-6-26

从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调用该方法。

调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行。

Spring内部线程池,其实是SimpleAsyncTaskExecutor,它不会复用线程的,设计初衷就是执行大量的短时间的任务。

指在@Async注解在使用时,不指定线程池的名称。查看源码,@Async的默认线程池为SimpleAsyncTaskExecutor。


默认线程池的弊端

在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。Executors各个方法的弊端:

newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。

newCachedThreadPool和newScheduledThreadPool:要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。

针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能。


@Async应用自定义线程池

自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。

自定义线程池有如下模式:

重新实现接口AsyncConfigurer

继承AsyncConfigurerSupport

配置由自定义的TaskExecutor替代内置的任务执行器


我们先创建一个异步执行的服务类:

package com.example.springboot.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class SyncService {

    Logger J101 = LoggerFactory.getLogger("J101");

    @Async("javacui_task")
    public void run(int i){
        J101.info("任务开始执行 -->" + i);
        try {
            Thread.sleep(5 * 1000);
        }catch (Exception e){
        }
        J101.info("任务执行完毕 -->" + i);
    }

}


写一个测试方法,调用数次这个异步方法来实验效果

package com.example.springboot;

import com.example.springboot.task.SyncService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class SyncTest {

    @Autowired
    private SyncService syncService;

    @Test
    public void test1() {
        for (int i = 1; i <= 5; i++) {
            syncService.run(i);
        }
        // 保证主线程不退出
        try {
            while (true){
                Thread.sleep(5 * 1000);
            }
        }catch (Exception e){
        }
    }
}


(1)通过查看Spring源码关于@Async的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。且重新实现 public Executor getAsyncExecutor()方法。

package com.example.springboot.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    public Executor getAsyncExecutor() {
        return executor();
    }
}


(2)继承AsyncConfigurerSupport

package com.example.springboot.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {

    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }
}


(4)配置自定义的TaskExecutor

由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查询是否存在默认名称为TaskExecutor的线程池。

所以可在项目中,定义名称为TaskExecutor的bean生成一个默认线程池。也可不指定线程池的名称,申明一个线程池,本身底层是基于TaskExecutor.class便可。

package com.example.springboot.config;

import org.springframework.aop.interceptor.AsyncExecutionAspectSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class TaskPoolConfig {

    @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Bean(name = "javacui_task")
    public Executor taskExecutorJavaCui() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}


(5)@Async注解,使用系统默认或者自定义的线程池(代替默认线程池)。

可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("javacui_task")。

package com.example.springboot.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class SyncService {

    Logger J101 = LoggerFactory.getLogger("J101");

    @Async("javacui_task")
    public void run(int i){
        J101.info("任务开始执行 -->" + i);
        try {
            Thread.sleep(5 * 1000);
        }catch (Exception e){
        }
        J101.info("任务执行完毕 -->" + i);
    }

}



(6)上述代码中,核心线程数量,最大线程数量,和队列都是1,因此,使用任何一个线程池配置时,执行都是按顺序执行的,修改MaxPoolSize看效果



备线程池参数:corePoolSize

Set the ThreadPoolExecutor's core pool size.Default is 1.

线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。

这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。

再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize


maxPoolSize

Set the ThreadPoolExecutor's maximum pool size.

Default is Integer.MAX_VALUE

线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列queueCapacity任务已满,并且当前线程个数小于maxPoolSize,那么会创建新的线程来执行任务


queueCapacity

Set the capacity for the ThreadPoolExecutor's BlockingQueue.

Default is Integer.MAX_VALUE

任务队列容量(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行


rejectedExecutionHandler

Set the RejectedExecutionHandler to use for the ExecutorService.

Default is the ExecutorService's default abort policy.

ThreadPoolExecutor类有几个内部实现类来处理拒绝任务:

1.AbortPolicy 丢弃任务,抛运行时异常

2.CallerRunsPolicy 执行任务

3.DiscardPolicy 忽视,什么都不会发生

4.DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务

5.实现RejectedExecutionHandler接口,可自定义处理器


线程池.jpg


END


推荐您阅读更多有关于“ spring 线程池 Async OOM ”的文章

上一篇:MySQL慢查优化 循环/嵌套子查询(DEPENDENT SUBQUERY) 下一篇:docker安装ES7.1.1(单机版)+ik分词器+es-head可视化

猜你喜欢

发表评论: