线程池的类型和原理
参考文章:
Java线程池的四种创建方式 - 绝不妥协绝不低头 - 博客园 (cnblogs.com)
JAVA线程池原理详解一 - 冬瓜蔡 - 博客园 (cnblogs.com)
1.线程池创建之使用线程池工厂
1.1.定长线程池
Executors.newFixedThreadPool(2),核心线程和线程总数一样,使用LinkedBlockingQueue队列(链表,容量Integer.Max)
//1.步骤一
ExecutorService newFixedThreadPool=Executors.newFixedThreadPool(2);
for(int j=0;j<4;j++){
final int index=j;
newFixedThreadPool.execute(new MyRunnable(index));
}
//2.步骤二 LinkedBlockingQueue队列的容量为Integer.Max
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//3.步骤三 Integer.MAX_VALUE
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//4.步骤四 defaultHandler实现了
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
1.2.周期性程池,支持周期性或者定时任务。
Executors.newScheduleThreadPool 核心线程是固定的,线程总数是Integer.Max_value,使用DelayedWorkQueue队列(最小堆,容量16)
//1.步骤一
ScheduledExecutorService newScheduleThreadPool= Executors.newScheduledThreadPool(2);
for(int k=0;k<4;k++){
final int index=k;
//执行结果:延迟三秒之后执行,除了延迟执行之外和newFixedThreadPool基本相同
newScheduleThreadPool.schedule(new MyRunnable(index),3, TimeUnit.SECONDS);
}
// 2.步骤二
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//3.步骤三
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
//步骤四
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
1.3.可缓存线程池
Executors.newCachedThreadPool 核心线程数为0,线程总数为Integer.Max,使用SynchronousQueue同步队列(双向链表,容量0) 特点:SynchronousQueue没有容量,可以确保任务立即被处理,而不是排队等待。
//1.步骤一
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
for(int i=0;i<4;i++) {
final int index=i;
newCachedThreadPool.execute(new MyRunnable(index));
}
//2.步骤二
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//3.步骤三
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
1.4.单线程池
Executors.newSingleThreadExecutor 核心线程数和线程总数都是1,使用的LinkedBlockingQueue队列(链表实现,容量默认Integer.Max)
//1.步骤一
ExecutorService newSingleThreadExtutor=Executors.newSingleThreadExecutor();
for(int l=0;l<4;l++){
final int index=l;
//执行结果:只存在一个线程,顺序执行
newSingleThreadExtutor.execute(new MyRunnable(index));
}
//2.步骤二
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//步骤三
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
2.线程池创建之使用ThreadPoolExecutor构造方法自定义线程池
- int corePoolSize 核心线程数
- int maximumPoolSize 线程总数
- long keepAliveTime 非核心空闲线程存活时间
- TimeUnit unit 非核心空闲线程存活时间单位
- BlockingQueue workQueue 任务队列
- ThreadFactory 线程工厂类 (工厂模式的方式创建线程)(接口,实现了newThread方法)【默认实现】
- RejectedExecutionHandler 拒绝策略 (接口,实现了RejectedExecution方法) 【默认实现】 #####2.1.自定义线程池 注意:如果想要自定线程工程,或者自定义拒绝策略都可以.
LinkedBlockingQueue queue=new LinkedBlockingQueue();
ThreadPoolExecutor theadPool=new ThreadPoolExecutor(
2,4,10,
TimeUnit.SECONDS,queue,
Executors.defaultTheadFactory,
defaultHandler );
2.2.系统默认的线程工工厂和拒绝策略
//线程工厂,用于批量创建线程
public interface ThreadFactory {
Thread newThread(Runnable r);
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
//拒绝策略,默认为抛出异常
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
2.3.默认的线程池工厂实现类
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
2.4.拒绝策略【A-C-D-D】 (在阻塞队列达到最大值,而且线程数也达到最大值了,线程池无法再处理新提交过来的任务,此时使用拒绝策略)
- AbortPolicy 默认策略,抛出异常,终止任务
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy 是使用当前提交任务的线程来执行任务
public static class CallerRunsPolicy implements RejectedExecutionHandler{
public CallerRunsPolicy(){ }
public void rejectedExecution(Runnable r , ThreadPoolExecutor e){
if(!e.shutdown()){
r.run();//当前提交任务的线程来执行任务
}
}
}
- DiscardPolicy 什么都不做,直接丢弃任务
public static class DiscardPolicy implements RejectExecutionHandler{
public DiscardPolicy(){}
public void rejectExecution(Runnable r, ThreadPoolExecutor e){ //do nothing
}
}
- DiscardOldestPolicy 丢弃最先放入队列的任务,然后将当前任务提交到线程池
public static class DiscardOldestPolicy implements RejectExecutionHandler{
public DiscardOldestPolicy(){}
public void rejectExecution(Runnable r, ThreadPoolExecutor e){
if(!e.isShutdown()){
e.getQueue().poll();
e.execute(r);
}
}
}
3.线程池源码解析
3.1.创建的线程池具体配置为:核心线程数量为5个;全部线程数量为10个;工作队列的长度为5。
3.2.我们通过queue.size()的方法来获取工作队列中的任务数。
3.3.线程池原理分析.
当向线程池提交一个任务时,首先判断当前正在工作的线程数是否>=核心线程数,如果小于核心线程数,那么创建新的线程,达到核心线程数量5个后,新的任务进来后不再创建新的线程,而是将任务加入工作队列,任务队列到达上线5个后,新的任务又会创建新的普通线程,直到达到线程池最大的线程数量10个,后面的任务则根据配置的饱和策略来处理。我们这里没有具体配置,使用的是默认的配置AbortPolicy:直接抛出异常。当然,为了达到我需要的效果,上述线程处理的任务都是利用休眠导致线程没有释放!!! #####3.4.线程池的核心线程会被回收吗? 默认情况下,线程池的核心线程是不会被回收的,即使他们处于空闲状态。这样可以避免频繁的创建线程,节省系统开销。当设置allowCoreThreadTimeCount(true),核心线程在空闲时超过keepAliveTime时,会被回收.
3.5.线程池源码分析
3.5.1.核心接口和类
![]()
Executor
Executor接口只是定义了一个基础的execute方法.
public interface Executor{
void execute(Runnable);
}
ExecutorService
ExecutorService接口定义了线程池的一些常用操作.
public interface ExecutorService extends Executor{
// 终止线程池,不再接受新任务,会将任务队列的任务执行完成
void shutdown();
//立即终止线程池,任务队列的任务不在执行,返回未执行任务集合.
List<Runnable> shutdownNow();
//判断线程池是否停掉,只要线程池不是RUNNING状态,都会返回true
boolean isShutdown();
//判断线程池是否完成终止,状态是TERMINATED
boolean isTerminated();
//提交Runnable任务,返回Future,Future的get方法返回值就是result参数.//get方法会阻塞当前线程
<T> Future<T> submit(Runnable task,T result);
//提交Rennable任务,返回Future,Future的get方法返回值就是null.//get方法会阻塞当前线程
Future<?> submit(Runnable task);
}
AbstractExecutorService
AbstractExecutorService是一个抽象类,实现了接口的一些方法,未实现的方法继续留给子类实现.
public abstract class AbstractExecutorService implement ExecutorService{
//提交Runnalbe任务,返回Future,Future的get方法返回值是null//get方法会阻塞当前线程
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);//真正执行任务的是子类实现的execute方法
return ftask;
}
//提交Runnable任务,返回Future,Future的get方法返回值是result参数.//get方法会阻塞当前线程
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);//真正执行任务的是子类实现的execute方法
return ftask;
}
}
任务执行
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.判断工作线程数是否核心线程数
if (workerCountOf(c) < corePoolSize) {
//2.添加工作线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//3.线程池为Running状态且任务添加到阻塞队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//添加工作线程执行任务
}
//4.队列满了,达到最大线程数之后,就会走拒绝策略,addWorker()返回值就是false
else if (!addWorker(command, false))
reject(command);
}
}
添加工作线程
- 第一阶段:设置ctl的线程数+1,主要是判断线程池状态以及线程数是否超限,然后对ctrl的线程数加1。
- 第二阶段:创建一个线程并启动执行,将创建的工作线程类Worker放入线程池工作线程集合里并启动,另外,如果出现异常情况,就走finally{},移除工作线程Worker,并执行ctl的线程数减1.
private final HashSet<Worker> workers = new HashSet<>(); //线程池工作线程集合
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
for (;;) {
//如果是核心线程 ,线程池中工作线程总数>=corePoolSize,返回false
//如果是非核心线程 ,线程池中工作线程总数>=maximumPoolSize,返回false
if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//设置ctl的线程数+1,跳出整个for循环
if (compareAndIncrementWorkerCount(c))
break retry; //跳出retry for循环
}
}
//以上只是将ctl的线程数+1了,以下是真正的创建一个工作线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//Worker构造方法中会使用ThreadFactory创建一个新线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 创建工作线程时,需要加锁,防止其他线程并发操作。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w); //并将Worker放入工作线程集合里
workerAdded = true;
int s = workers.size();
// 这里就是标记线程池里创建线程的最大值,这个值最大也不会超过maximumPoolSize。
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();//释放锁
}
if (workerAdded) {
t.start(); //启动工作线程>>Worker.run()>>runWorker()>>firstTask.run()
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//如果添加子线程工作流程失败,
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 涉及工作线程的相关操作都需要加锁
mainLock.lock();
try {
// 从工作线程集合里移除worker
if (w != null)
workers.remove(w);
// cas操作ctl线程数减1
decrementWorkerCount();
// 判断是否需要终止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
工作线程类Worker
Worker类是具体的工作线程类,持有一个Thread线程和一个Runnable任务实例,并实现了Runnable接口.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// Worker的Thread属性,其实干活的就是这个线程
final Thread thread;
// 任务
Runnable firstTask;
// 线程已经执行完成的任务总数
volatile long completedTasks;
// 构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 线程执行时调用的就是Worker.run() >>runWorker()>>firstTask.run()
//使用线程工程批量创建线程.
this.thread = getThreadFactory().newThread(this);
}
// run方法执行任务,调用的是外部ThreadPoolExecutor的runWorker方法
public void run() {
runWorker(this);
}
}
执行任务runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run(); //执行任务
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}