网站首页 > 技术文章 正文
ThreadPoolTaskExecutor是一个Spring的线程池技术,其实,它的实现方式完全是使用JDK中ThreadPoolExecutor进行实现。在后台Spring Boot开发中,经常有一些非主流程业务要处理,为了提升主业务处理速度,可使用ThreadPoolTaskExecutor线程池来异步处理。
Spring中ThreadPoolTaskExecutor 线程池的使用入门
一、ThreadPoolTaskExecutor参数和方法说明
下面是ThreadPoolTaskExecutor的一些常用参数和方法的说明。
二、ThreadPoolTaskExecutor 线程池处理流程
ThreadPoolExecutor池子的处理流程如下:
1)当池子大小小于corePoolSize就新建线程,并处理请求
2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理
3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理
4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁
其会优先创建 corePoolSize 线程, 当继续增加线程时,先放入Queue中,当 corePoolSize 和 workQueue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出异常 org.springframework.core.task.TaskRejectedException。
三、ThreadPoolTaskExecutor线程池注入
通过配置类可以多个线程池注入,每个线程池用于不同的业务场景,避免各业务之间相互影响。
在PoolConfig 配置类中,我们创建了2个线程池。默认情况下bean的名称和方法名称相同,也可以使用name属性来指定。
完整代码如下所示:
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
public class PoolConfig {
@Bean(name = "poolExecutor")
public ThreadPoolTaskExecutor poolExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 核心线程数
threadPoolTaskExecutor.setCorePoolSize(8);
// 等待队列容量,线程数不够时,任务会等待
threadPoolTaskExecutor.setQueueCapacity(1000);
// 最大线程数,超过会抛异常
threadPoolTaskExecutor.setMaxPoolSize(12);
// 设置线程名称前缀
threadPoolTaskExecutor.setThreadNamePrefix("pool-");
// 初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor(){
log.info("start taskExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(5);
//配置最大线程数
executor.setMaxPoolSize(8);
//配置队列大小
executor.setQueueCapacity(100);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("task-");
// rejection-policy: 当pool已经达到max PoolSize 的时候,如何处理新任务
// CALLER_RUNS: 不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
介绍一下其中的线程池拒绝策略,当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务。
四、ThreadPoolTaskExecutor 使用示例
新任务提交时:若当前线运行的程数量小于核心线程数,则创建一条新线程;
若已经超过核心线程数,则先放入队列中; 队列满后,创建新线程;
当线程总数等于最大线程数时,则执行拒绝策略。
@Test
void simpleTask2() throws InterruptedException {
int taskNum = 10;
CountDownLatch latch = new CountDownLatch(taskNum);
for(int i=0; i<taskNum; i++) {
final int taskId = i;
System.out.println("--- 提交任务 " + taskId);
taskExecutor.execute(
()->{
try {
System.out.printf("task: %s, thread: %s, start at %d\n",
taskId, Thread.currentThread().getName(), System.currentTimeMillis()/1000);
Thread.sleep(2_000);
System.out.printf("task: %s, thread: %s, end at %d\n",
taskId, Thread.currentThread().getName(), System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
);
}
latch.await();
taskExecutor.shutdown();
}
启动应用,输出结果正常。
如果想触发拒绝策略,可以将taskNum 设置为一个比较大的数值(如1000)。如图所示,可以看到触发maxPoolSize之后,会在调用者所在的线程(main)来执行任务。
猜你喜欢
- 2024-12-12 springboot定时任务防止多实例重复执行架构方案的实现
- 2024-12-12 springboot-多数据源-使用详解
- 2024-12-12 字节面试官:“这35道 Spring Cloud 面试题都答不上来?”
- 2024-12-12 SpringBoot自定义线程池(ThreadPoolTaskExecutor)
- 2024-12-12 你不知道的Spring定时任务
- 2024-12-12 springboot整合xxl-job分布式定时任务
- 2024-12-12 如何将Spring Cloud Task发布Data Flow上执行?
- 2024-12-12 详细介绍一下Spring Boot项目中如何使用定时任务操作?
- 2024-12-12 Spring任务执行与调度配置及使用详解
- 2024-12-12 Spring Task 执行长耗时的任务时,会堆集并行执行吗?
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- c语言min函数头文件 (77)
- asynccallback (87)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 无效的列索引 (74)