优秀的编程知识分享平台

网站首页 > 技术文章 正文

Spring中ThreadPoolTaskExecutor 线程池的使用进阶

nanyue 2024-12-12 14:12:08 技术文章 10 ℃

ThreadPoolTaskExecutor是一个Spring的线程池技术,其实,它的实现方式完全是使用JDK中ThreadPoolExecutor进行实现。在后台Spring Boot开发中,经常有一些非主流程业务要处理,为了提升主业务处理速度,可使用ThreadPoolTaskExecutor线程池来异步处理。

Spring中ThreadPoolTaskExecutor 线程池的使用入门


一、ThreadPoolTaskExecutor参数和方法说明

下面是ThreadPoolTaskExecutor的一些常用参数和方法的说明。



二、ThreadPoolTaskExecutor 线程池处理流程

ThreadPoolExecutor池子的处理流程如下:


1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁


其会优先创建 corePoolSize 线程, 当继续增加线程时,先放入Queue中,当 corePoolSize 和 workQueue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出异常 org.springframework.core.task.TaskRejectedException。


三、ThreadPoolTaskExecutor线程池注入

通过配置类可以多个线程池注入,每个线程池用于不同的业务场景,避免各业务之间相互影响。


在PoolConfig 配置类中,我们创建了2个线程池。默认情况下bean的名称和方法名称相同,也可以使用name属性来指定。

完整代码如下所示:

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@Slf4j
public class PoolConfig {
 @Bean(name = "poolExecutor")
 public ThreadPoolTaskExecutor poolExecutor() {
 ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
 // 核心线程数
 threadPoolTaskExecutor.setCorePoolSize(8);
 // 等待队列容量,线程数不够时,任务会等待
 threadPoolTaskExecutor.setQueueCapacity(1000);
 // 最大线程数,超过会抛异常
 threadPoolTaskExecutor.setMaxPoolSize(12);
 // 设置线程名称前缀
 threadPoolTaskExecutor.setThreadNamePrefix("pool-");
 // 初始化
 threadPoolTaskExecutor.initialize();

 return threadPoolTaskExecutor;
 }

 @Bean(name = "taskExecutor")
 public ThreadPoolTaskExecutor taskExecutor(){
 log.info("start taskExecutor");
 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 //配置核心线程数
 executor.setCorePoolSize(5);
 //配置最大线程数
 executor.setMaxPoolSize(8);
 //配置队列大小
 executor.setQueueCapacity(100);
 //配置线程池中的线程的名称前缀
 executor.setThreadNamePrefix("task-");

 // rejection-policy: 当pool已经达到max PoolSize 的时候,如何处理新任务
 // CALLER_RUNS: 不在新线程中执行任务,而是有调用者所在的线程来执行
 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 //执行初始化
 executor.initialize();
 return executor;
 }
}


介绍一下其中的线程池拒绝策略,当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务。



四、ThreadPoolTaskExecutor 使用示例

新任务提交时:若当前线运行的程数量小于核心线程数,则创建一条新线程;

若已经超过核心线程数,则先放入队列中; 队列满后,创建新线程;

当线程总数等于最大线程数时,则执行拒绝策略。

@Test
void simpleTask2() throws InterruptedException {
 int taskNum = 10;
 CountDownLatch latch = new CountDownLatch(taskNum);

 for(int i=0; i<taskNum; i++) {
 final int taskId = i;
 System.out.println("--- 提交任务 " + taskId);
 taskExecutor.execute(
 ()->{
 try {
 System.out.printf("task: %s, thread: %s, start at %d\n",
 taskId, Thread.currentThread().getName(), System.currentTimeMillis()/1000);
 Thread.sleep(2_000);
 System.out.printf("task: %s, thread: %s, end at %d\n",
 taskId, Thread.currentThread().getName(), System.currentTimeMillis()/1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 } finally {
 latch.countDown();
 }
 }
 );
 }

 latch.await();
 taskExecutor.shutdown();
}

启动应用,输出结果正常。

如果想触发拒绝策略,可以将taskNum 设置为一个比较大的数值(如1000)。如图所示,可以看到触发maxPoolSize之后,会在调用者所在的线程(main)来执行任务。

Tags:

最近发表
标签列表