多线程在现在工作中出现越来越频繁、需要我们熟记并且能熟练地使用之、对相关线程池的一些配置需要我们非常熟悉。
corePoolSize 核心线程数
- 核心线程会一直存活,即使没有任务需要执行
- 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
- 设置allowCoreThreadTimeOut=true(默认false)时,核心线程会超时关闭
queueCapacity 任务队列容量(阻塞队列)
maxPoolSize 最大线程数
- 当线程数 >= corePoolSize 且任务队列已满时,线程池会创建新线程来处理任务
- 当线程数 = maxPoolSize且任务队列已满时,线程池会拒绝处理任务而抛出异常
keepAliveTime 线程空闲时间
- 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=coolPoolSize
- 如果allowCoreThreadTimeOut=true时,则会直到线程数量=0
allowCoreThreadTimeOut 允许核心线程超时
rejectedExecutionHandler 任务拒绝处理器
- ThreadPoolExecutor类有几个内部实现类来处理这类情况
1. **AbortPolicy** 丢弃任务、抛运行时异常
1. **CallerRunsPolicy** 执行任务
1. **DisCardPolicy** 忽视,什么都不会发生
1. **DisCardOldestPolicy** 从队列中提出最先 进入队列(最后一个执行)的任务
线程池按照以下行为执行任务
1. 当线程数 < 核心线程数时,创建线程。
2. 当线程数 >= 核心线程数,且任务队列未满时,将任务放入任务队列。
3. 当线程数 >= 核心线程数,且任务队列已满
- 若线程数 < 最大线程数,创建线程
- 若线程数 = 最大线程数,抛出异常,拒绝任务
corePoolSize = 1
queueCapacity = Integer.MAX_VALUE
maxPoolSize = Integer.MAX_VALUE
keepAliveTime = 60s
allowCoreThreadTimeOut = false
rejectedExecutionHandler = AbortPolicy
tasks : 每秒的任务数,假设为 500 - 1000
taskcost : 每个任务花费时间,假设为0.1s
responsetime : 系统允许容忍的最大响应时间,假设为1s
corePoolSize = 每秒需要多少个线程处理
* threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
* 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
* 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
* 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
* 计算可得 maxPoolSize = (1000-80)/10 = 92
* (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
@Component
public class KafkaReceiver {
@Autowired
private BroadbandService broadbandService;
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("deal-pool-%d")
.build();
private ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(10), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 监听kafka消息受理业务
*/
@KafkaListener(topics = "${kafka.topics[1]}")
public void listenerKafkaProductCommit(ConsumerRecord record) {
threadPool.execute(() -> {
try {
BusinessOrder businessOrder = JsonUtil.jsonToObject(record.value(), BusinessOrder.class);
log.info("消息队列正在工作,订单号: {}", businessOrder.getOrderNo());
//尝试多线程处理
broadbandService.asynSchoolBroadbandRemoveService(businessOrder);
} catch (Exception e) {
log.error("消息队列正在工作异常", e);
}
});
}
}
投稿时间:2022-07-03 最后更新:2022-07-03
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2022 All Rights Reserved. Powered By Q578.com 闽ICP备11008920号-1
闽公网安备35020302032606号