线程池的创建方式
· 阅读需 10 分钟
两大类创建线程池的方式
- 通过
ThreadPoolExecutor
手动创建线程池 - 通过
Executors
执行器自动创建线程池
以上为两类创建线程池的方式,具体实现有如下7种方法:
方法 | 解释说明 |
---|---|
ThreadPoolExecutor | 手动创建线程池,可自定义相关参数,最多可设置 7 个参数 |
Executors.newFixedThreadPool | 创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待 |
Executors.newCachedThreadPool | 创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程 |
Executors.newSingleThreadExecutor | 创建单个线程数的线程池,可保证先进先出的执行顺序 |
Executors.newScheduledThreadPool | 创建一个可以执行延迟任务的线程池 |
Executors.newSingleThreadScheduledExecutor | 创建一个单线程的可以执行延迟任务的线程池; |
Executors.newWorkStealingPool | 创建一个抢占式执行的线程池,任务执行顺序不确定 ;JDK 1.8 添加 |
线程池的创建及具体使用代码示例
ThreadPoolExecutor
手动创建
提示
阿里巴巴建议使用此方式!
代码示例:
@Slf4j
@SpringBootTest
class PublicWechatApplicationTests {
private ExecutorService executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() << 2,
1200L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20000),
new ThreadFactoryBuilder().setNameFormat("- 高高手动创建的线程池-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
@Test
public void testThreadPoolExecutor() {
for (int i = 0; i < 10; i++) {
final int value = i;
executor.execute(() -> {
log.info("请执行具体业务逻辑:{}", value);
});
}
}
}
执行结果:
2023-12-18 18:06:47.588 INFO 16432 --- [ - 高高手动创建的线程池-7] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:7
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-3] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:3
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-0] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:0
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-2] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:2
2023-12-18 18:06:47.589 INFO 16432 --- [ - 高高手动创建的线程池-0] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:9
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-5] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:5
2023-12-18 18:06:47.589 INFO 16432 --- [ - 高高手动创建的线程池-7] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:8
2023-12-18 18:06:47.588 INFO 16432 --- [ - 高高手动创建的线程池-6] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:6
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-1] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:1
2023-12-18 18:06:47.587 INFO 16432 --- [ - 高高手动创建的线程池-4] c.p.PublicWechatApplicationTests : 请执行具体业务逻辑:4
小结:
ThreadPoolExecutor
相比于其他方式 创建线程池的优势在于:
它可以通过参数来控制最大任务数和拒绝策略,让线程池的执行更加透明和可控,可以规避一些未知的风险。
newFixedThreadPool【常见】
Executors.newFixedThreadPool(int var0)
源码如下:
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
因传入的参数
var0
,即使创建的核心线程数也是线程总数,所以只能创建核心线程数。 因为LinkedBlockingQueue
的默认大小是Integer.MAX_VALUE
,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
使用Executors.newFixedThreadPool
创建5个固定大小的线程池,代码示例如下:
public void testFixedThreadPool() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newFixedThreadPool(5);
//创建一个线程
Thread thread = new Thread(() -> {
log.info("执行任务中,线程:{}", Thread.currentThread().getName());
});
//execute 执行没有返回结果
service.execute(thread);
//创建一个有返回结果的线程
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
// 此处执行具体的业务逻辑
return "线程返回结果....";
}
};
// submit 执行结束后可获取返回结果
Future<String> submit = service.submit(callable);
String result = submit.get();
log.info("Callable 线程 {} 执行任务 结果:{}", Thread.currentThread().getName(), result);
Thread.sleep(3000L);
service.execute(thread);
}
newCachedThreadPool【常见】
Executors.newCachedThreadPool()
源码如下:
public static ExecutorService newCachedThreadPool() {
// 2147483647 = Integer.MAX_VALUE
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
运行流程如下:
1. 提交任务进线程池
2. 因 `corePoolSize` 为 0 ,故不创建核心线程,线程池最大为 2147483647 (即:Integer.MAX_VALUE)
3. 尝试将任务添加到SynchronousQueue队列
4. 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从
SynchronousQueue拉取任务并在当前线程执行
6. 如果SynchronousQueue已有任务在等待,入列操作将会阻塞
总结:
当执行很多较短时间的任务时,其线程的复用率比较高,会显著提升性能,而线程60s后会回收,意味没有任务进来,CacheThreadPool
并不会占用很多资源
适用场景:
根据短时间的任务量来决定创建的线程数量,适合于短时间内有突发大量任务的处理场景。
代码示例如下:
public void testCachedThreadPool() throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
int finalI = i;
service.execute(() -> {
log.info(" 线程 {} 执行任务 ,线程 i:{}", Thread.currentThread().getName(), finalI);
});
}
}