前言
前两篇文章,我们已经学会了如何使用spring boot的多线程和自定义线程池。这篇文章,我们要深入了解上一篇文章中线程池的配置具体含义。
准备工作
说明
为了方便观察线程的情况(如执行完毕数量,正在执行数量,队列中等待数量等),我们应该先写一个可以将线程情况打印到控制台的类。
下面将介绍一种方式,在每次调用线程的时候,都会将当前线程的情况打印到控制台。
项目结构
└── Study ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ │ └── top │ │ │ └── yxdz │ │ │ └── study │ │ │ └── spring │ │ │ └── springboot │ │ │ ├── thread │ │ │ │ └── service │ │ │ │ ├── ITestService.java │ │ │ │ └── impl │ │ │ │ └── TestSerivceImpl.java │ │ │ └── utils │ │ │ ├── SysThreadValueConfig.java │ │ │ └── config │ │ │ └── Thread │ │ │ ├── SysThreadConfig.java │ │ │ └── VisiableThreadPoolTaskExecutor.java │ │ └── resources │ │ ├── application.yml │ └── test │ └── java │ └── top │ └── yxdz │ └── study │ └── StudyApplicationTests.java
关键代码
VisiableThreadPoolTaskExecutor.java
继承
ThreadPoolTaskExecutor
,自定义展示线程方法printThreadPoolInfo
,在每个继承方法执行前调用,实现每次调用线程都会打印当前线程池情况的日志。package top.yxdz.study.spring.springboot.utils.config.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;/** * 实现每次调用都打印线程池情况 */@Configurationpublic class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger LOG = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); /** * 打印线程池信息 * * @param functionName 调用函数 */ private void printThreadPoolInfo(String functionName){ try { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); LOG.info("{}, {}, taskCount[{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), functionName, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); }catch (IllegalStateException e){ LOG.error("ThreadPoolTaskExecutor not initialized"); } } @Override public void execute(Runnable task){ printThreadPoolInfo("1.execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout){ printThreadPoolInfo("2.execute"); super.execute(task, startTimeout); } @Override public Future submit(Runnable task){ printThreadPoolInfo("1.submit"); return super.submit(task); } @Override public
Future submit(Callable task){ printThreadPoolInfo("2.submit"); return super.submit(task); } @Override public ListenableFuture submitListenable(Runnable task){ printThreadPoolInfo("1.submitListenable"); return super.submitListenable(task); } @Override public ListenableFuture submitListenable(Callable task){ printThreadPoolInfo("2.submitListenable"); return super.submitListenable(task); } @Override protected void cancelRemainingTask(Runnable task){ printThreadPoolInfo("1.cancelRemainingTask"); super.cancelRemainingTask(task); }} SysThreadConfig.java
在自定义线程池中,实例化上面自定义的继承类
VisiableThreadPoolTaskExecutor
即可。package top.yxdz.study.spring.springboot.utils.config.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import top.yxdz.study.spring.springboot.utils.SysThreadValueConfig;import java.util.concurrent.Executor;import java.util.concurrent.ThreadPoolExecutor;/** * 自定义线程池 */@Configurationpublic class SelfThreadConfig { private static Logger LOG = LoggerFactory.getLogger(SelfThreadConfig.class); @Autowired SysThreadValueConfig sysValueConfig; @Bean public Executor myThreadPool(){ //实例化继承类VisiableThreadPoolTaskExecutor,实现每次调用线程会打印线程汇总情况日志 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //核心线程池大小 executor.setCorePoolSize(sysValueConfig.getCorePoolSize()); //最大线程数 executor.setMaxPoolSize(sysValueConfig.getMaxPoolSize()); //队列容量 executor.setQueueCapacity(sysValueConfig.getQueueCapacity()); //活跃时间 executor.setKeepAliveSeconds(sysValueConfig.getKeepAliveSeconds()); //线程名字前缀 executor.setThreadNamePrefix("my-thread-pool"); //线程池满的时候,处理新任务的策略 //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程实例化 executor.initialize(); return executor; }}
执行结果
······2018-12-19 19:22:46.065 INFO 65471 --- [ main] s.s.u.c.T.VisiableThreadPoolTaskExecutor : my-thread-pool, 2.submit, taskCount[0], completedTaskCount [0], activeCount [0], queueSize [0]2018-12-19 19:22:46.066 INFO 65471 --- [ main] s.s.u.c.T.VisiableThreadPoolTaskExecutor : my-thread-pool, 2.submit, taskCount[1], completedTaskCount [0], activeCount [1], queueSize [0]······
配置参数详解
线程池运行规则
下图是从其他博客()转载过来。看的懂更好,看不懂也没关系,下面会进行详细说明。
自定义配置样例代码
package top.yxdz.study.spring.springboot.utils.config.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import top.yxdz.study.spring.springboot.utils.SysThreadValueConfig;import java.util.concurrent.Executor;import java.util.concurrent.ThreadPoolExecutor;/** * 自定义线程池 */@Configurationpublic class SelfThreadConfig { private static Logger LOG = LoggerFactory.getLogger(SelfThreadConfig.class); @Autowired SysThreadValueConfig sysValueConfig; @Bean public Executor myThreadPool(){ //实例化继承类VisiableThreadPoolTaskExecutor,实现每次调用线程会打印线程汇总情况日志 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //核心线程池大小 executor.setCorePoolSize(sysValueConfig.getCorePoolSize()); //最大线程数 executor.setMaxPoolSize(sysValueConfig.getMaxPoolSize()); //队列容量 executor.setQueueCapacity(sysValueConfig.getQueueCapacity()); //活跃时间 executor.setKeepAliveSeconds(sysValueConfig.getKeepAliveSeconds()); //线程名字前缀 executor.setThreadNamePrefix("my-thread-pool"); //线程池满的时候,处理新任务的策略 //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程实例化 executor.initialize(); return executor; }}
参数定义
CorePoolSize(核心线程池数)
线程池创建时候初始化的线程数。
MaxPoolSize(线程池最大容载量)
线程池最大的线程数,只有在缓冲队列满了之后才会申请超过CorePoolSize的线程。
QueueCapacity(缓冲队列)
当CorePoolSize满了之后的线程缓冲的队列。
KeepAliveSeconds(最大存活秒数)
超过了核心线程之外的线程在此设定的时间到达后会销毁。
ThreadNamePrefix(线程名字前缀)
日志中的线程名称前缀,方便我们查看和分析
RejectedExecutionHandler(策略处理)
当线程池没有处理能力的时候,启动的一种对策,一共四种模式,图中说明很详细了。
详细分析
目录结构
└── Study ├── pom.xml ├── src │ ├── main │ │ ├── java │ │ │ └── top │ │ │ └── yxdz │ │ │ └── study │ │ │ └── spring │ │ │ └── springboot │ │ │ ├── thread │ │ │ │ └── service │ │ │ │ ├── ITestService.java │ │ │ │ └── impl │ │ │ │ └── TestSerivceImpl.java │ │ │ └── utils │ │ │ ├── SysThreadValueConfig.java │ │ │ └── config │ │ │ └── Thread │ │ │ ├── SelfThreadConfig.java │ │ │ └── VisiableThreadPoolTaskExecutor.java │ │ └── resources │ │ ├── application.yml │ └── test │ └── java │ └── top │ └── yxdz │ └── study │ └── StudyApplicationTests.java
关键代码
application.yml
为了打印日志方便,自定义线程池的配置稍微做了修改,数量减少。
spring: thread: threadNamePrefix: system-thread #线程名称前缀 corePoolSize: 2 #核心线程池大小 maxPoolSize: 4 #最大线程数 keepAliveSeconds: 30 #活跃时间(单位:秒) queueCapacity: 5 #队列容量
StudyApplicationTests.java
启动入口。
package top.yxdz.study;import org.junit.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.test.context.junit4.SpringRunner;import top.yxdz.study.spring.springboot.thread.service.ITestService;import java.time.LocalDateTime;@RunWith(SpringRunner.class)@SpringBootTest@EnableAsyncpublic class StudyApplicationTests { private static final Logger LOG = LoggerFactory.getLogger(StudyApplicationTests.class); @Autowired ITestService iTestService; @Test public void contextLoads() { for(int i=0; i<15; i++){ LOG.info(LocalDateTime.now().toString()); iTestService.method2("myself" + i); try { Thread.sleep(1000); }catch (InterruptedException e){ e.printStackTrace(); } } try { //等待70s,防止异步代码被强制关闭导致线程抛出异常 Thread.sleep(70000); }catch (InterruptedException e){ e.printStackTrace(); } }}
TestSerivceImpl.java
每个线程,均会等待
20秒
,用来阻塞线程,方便观察线程池工作规则。package top.yxdz.study.spring.springboot.thread.service.impl;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;import top.yxdz.study.spring.springboot.thread.service.ITestService;import java.time.LocalDateTime;@Service("TestSerivceImpl")public class TestSerivceImpl implements ITestService { private static Logger LOG = LoggerFactory.getLogger(TestSerivceImpl.class); /** * 使用个自定义线程池 * @param msg */ @Override @Async("myThreadPool") public void method2(String msg){ try { Thread.sleep(20000); }catch (InterruptedException e){ e.printStackTrace(); } }}
运行结果
日志结果太长,可直接进入结果分析。
2018-12-19T20:46:28.630my-thread-pool, 2.submit, taskCount[0], completedTaskCount[0], activeCount[0], queueSize[0]2018-12-19T20:46:29.644my-thread-pool, 2.submit, taskCount[1], completedTaskCount[0], activeCount[1], queueSize[0]2018-12-19T20:46:30.646my-thread-pool, 2.submit, taskCount[2], completedTaskCount[0], activeCount[2], queueSize[0]2018-12-19T20:46:31.650my-thread-pool, 2.submit, taskCount[3], completedTaskCount[0], activeCount[2], queueSize[1]2018-12-19T20:46:32.653my-thread-pool, 2.submit, taskCount[4], completedTaskCount[0], activeCount[2], queueSize[2]2018-12-19T20:46:33.654my-thread-pool, 2.submit, taskCount[5], completedTaskCount[0], activeCount[2], queueSize[3]2018-12-19T20:46:34.655my-thread-pool, 2.submit, taskCount[6], completedTaskCount[0], activeCount[2], queueSize[4]2018-12-19T20:46:35.656my-thread-pool, 2.submit, taskCount[7], completedTaskCount[0], activeCount[2], queueSize[5]2018-12-19T20:46:36.658my-thread-pool, 2.submit, taskCount[8], completedTaskCount[0], activeCount[3], queueSize[5]2018-12-19T20:46:37.671my-thread-pool, 2.submit, taskCount[9], completedTaskCount[0], activeCount[4], queueSize[5]2018-12-19T20:46:58.677my-thread-pool, 2.submit, taskCount[9], completedTaskCount[4], activeCount[4], queueSize[1]2018-12-19T20:46:59.680my-thread-pool, 2.submit, taskCount[10], completedTaskCount[4], activeCount[4], queueSize[2]2018-12-19T20:47:00.685my-thread-pool, 2.submit, taskCount[11], completedTaskCount[4], activeCount[4], queueSize[3]2018-12-19T20:47:01.687my-thread-pool, 2.submit, taskCount[12], completedTaskCount[4], activeCount[4], queueSize[4]2018-12-19T20:47:02.691my-thread-pool, 2.submit, taskCount[13], completedTaskCount[4], activeCount[4], queueSize[5]
结果分析
虽然日志不多,但是因为排版问题(有的电脑是两行显示的),可能看起来比较麻烦,故对结果结果进行一次再提炼和统计,更加直观。(注:线程池满,采用的是CallerRunsPolicy策略)
序号 | 时间 | 方法 | 线程总数 | 已完成线程数 | 运行中的线程数 | 队列缓存的线程数 |
---|---|---|---|---|---|---|
1 | 20:46:28 | 2.submit | 0 | 0 | 0 | 0 |
2 | 20:46:29 | 2.submit | 1 | 0 | 1 | 0 |
3 | 20:46:30 | 2.submit | 2 | 0 | 2 | 0 |
4 | 20:46:31 | 2.submit | 3 | 0 | 2 | 1 |
5 | 20:46:32 | 2.submit | 4 | 0 | 2 | 2 |
6 | 20:46:33 | 2.submit | 5 | 0 | 2 | 3 |
7 | 20:46:34 | 2.submit | 6 | 0 | 2 | 4 |
8 | 20:46:35 | 2.submit | 7 | 0 | 2 | 5 |
9 | 20:46:36 | 2.submit | 8 | 0 | 3 | 5 |
10 | 20:46:37 | 2.submit | 9 | 0 | 4 | 5 |
11 | 20:46:58 | 2.submit | 9 | 4 | 4 | 1 |
12 | 20:46:59 | 2.submit | 10 | 4 | 4 | 2 |
13 | 20:47:00 | 2.submit | 11 | 4 | 4 | 3 |
14 | 20:47:01 | 2.submit | 12 | 4 | 4 | 4 |
15 | 20:47:02 | 2.submit | 13 | 4 | 4 | 5 |
序号1~序号2
每秒一个异步线程,线程池的核心线程数为2,故运行中的线程数一直递增,直到达到2。
序号3~序号8
从序号3开始,因为核心线程数已经满了,故新增的线程将放入队列进行缓存,故运行的线程数不变,队列缓存的线程数递增。
序号9~序号10
这两个异步线程进来的时候,核心线程数已满,队列缓存的也满了,并且核心线程数小于MaxPoolSize,故核心线程数开始上升,直到到达MaxPoolSize。
序号11~序号15
从序号10到序号11,中间相差了20秒,到达序号11的时候,突然四个线程被完成了,而非一个个慢慢完成。造成这种情况的原因是因为采取的RejectedExecutionHandler(策略)导致的。
CallerRunsPolicy
策略表示,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中执行。也就是说,在序号10的时候,主线程被用来执行本该分配的异步线程,导致主线程阻塞,直到该任务运行完毕。而在这期间,异步线程已经完成了四个(及MaxPoolSize,大家可以想象为啥),所以已完成线程数为4,队列中也就空出可以缓存新的异步任务的空间。