概述
在Spring Webflux中,有两个关键的调度器操作符:publishOn和subscribeOn。理解它们的用途和差异是使用WebFlux进行反应式编程的重要部分。
- subscribeOn:这个操作符定义了产生数据的线程。无论在哪里放置subscribeOn,都只会影响源头的执行线程。如果在一个流的多个地方使用subscribeOn,只有第一个起作用。
- publishOn:这个操作符可以影响其后续操作运行的线程。如果在流中的多个地方使用publishOn,每个都会起作用,并且会改变其后续操作的执行线程。
Scheduler调度器
在Spring WebFlux中,Scheduler(调度器)是Project Reactor中的一个概念,它负责控制并发和线程切换。 主要有以下几种常见类型的Scheduler:
- Schedulers.immediate():直接在当前线程执行任务,不进行线程切换。
- Schedulers.single():创建一个只包含一个线程的调度器,所有任务都在这个单一线程上执行。如果这个线程出现故障,将创建一个新的线程替代它。
- Schedulers.elastic():创建一个弹性的线程池,适用于IO阻塞操作比如数据库查询、文件读写等。该线程池会根据需要创建新的线程,如果线程闲置时间过长则会被销毁。弹性线程池可以无限大,所以要注意不要在大量计算密集型的任务中使用它,因为这可能导致创建大量线程,消耗过多系统资源。
- Schedulers.parallel():创建一个固定大小的线程池,线程数量默认等于CPU核心数。适用于计算密集型任务,因为这类任务能够充分利用CPU资源。
- Schedulers.boundedElastic():创建一个有界的弹性线程池,适合I/O任务。它与Schedulers.elastic()相似,但有最大线程数和队列大小的限制。
publishOn和subscribeOn使用场景
使用Scheduler的方法主要是通过publishOn和subscribeOn两个操作符。
- 使用publishOn将数据流中后续的处理操作在parallel调度器对应的线程池中执行
flux.publishOn(Schedulers.parallel()).map(this::blockMethod)
- 使用subscribeOn将源头的操作在boundedElastic调度器对应的线程池中执行
Mono.fromCallable(this::blockMethod).subscribeOn(Schedulers.boundedElastic())
两者的区别
- publishOn常用于切换执行下游操作符的线程,影响范围和它的位置有关。
- subscribeOn常用于切换源头数据生成的线程,影响范围则和位置无关。
使用示例
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class PublishOnAndSubscribeOnTests {
@Test
public void testPublishOnAndSubscribeOn() {
Flux.just("Hello").map(s -> {
System.out.println("[1] Thread name: " + Thread.currentThread().getName());
return s.concat(" World");
}).publishOn(Schedulers.newParallel("thread-publishOn")).map(s -> {
System.out.println("[2] Thread name: " + Thread.currentThread().getName());
return s;
}).subscribeOn(Schedulers.newSingle("thread-subscribeOn")).subscribe(s -> {
System.out.println("[3] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
});
}
}
- 输入结果如下:
[1] Thread name: thread-subscribeOn-1
[2] Thread name: thread-publishOn-1
[3] Thread name: thread-publishOn-1
Hello World
- 从上面的例子可以看出,subscribeOn定义在publishOn之后,但是却从源头开始生效。
- 而在publishOn执行之后,线程池变更为publishOn所定义的。
总结
- 在实际使用中,我们可以根据需要选择使用publishOn或subscribeOn,它们可以帮助我们更好地控制程序的并发行为和系统资源的利用。
- 然而,publishOn与subscribeOn也有一些局限性:
1.无法解决所有并发问题:虽然它们可以将任务移至其他线程执行,但如果你的程序设计本身就没有考虑到并发安全,那么使用这两者仍可能出现问题。
2.线程切换开销:每次通过publishOn或subscribeOn进行线程切换时,都会产生一定的系统开销。如果大量无谓的线程切换,可能会对性能产生负面影响。