Java线程既是工作单元,也是执行机制。jdk5后将工作单元和执行机制分离开来,工作单元包括Runnable和Callable,执行单元为Executor。
Executor框架简介
- Executor框架的两级调度模型

应用程序通过Executor框架控制上层的调度,下层的调度由操作系统内核控制!
- Executor框架的结构与成员
3大框架结构
- 任务:
包括被执行任务需要被实现的接口(Runnable和Callable); - 任务的执行:
任务执行机制的核心接口Executor,以及继承Executor的ExecutorService接口;Executor框架的两个关键类实现了ExecutorService接口,即ThreadPoolExecutor和ScheduledThreadPoolExecutor; - 异步计算的结果:
Future和实现Future接口的FutureTask类。

- 任务:
主要框架成员
ThreadPoolExecutor:一般由工厂类Executors创建,有3种类型;
- SingleThreadExecutors(适用于保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景)
- FixedThreadPool(适用于需要限制当前线程数的应用场景,比如负载较重的服务器)
- CachedThreadPool(大小无界的线程池,适用于执行很多短期异步任务的小程序,或负载较轻的服务器);
ScheduledThreadPoolExecutor: 一般由工厂类Executors创建,有2种类型;
- ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor,适用于需要后台多个线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景;
- SingleThreadPoolExecutor:只包含一个线程的ScheduledThreadPoolExecutor,适用于单个后台线程执行周期任务,同时需要保证顺序执行多个任务的应用场景;
Future接口:该接口与实现该接口的FutureTask类表示异步计算的结果
Runnable或Callable接口:区别Runnable不会返回结果,而Callable可以。除了可以自定义实现Callable接口的对象外,还可用工厂类Executors将Runnable包装为一个Callable
ThreadPoolExecutor详解
最核心类ThreadPoolExecutor,是线程池的实现类,主要由以下4个组件构成:
- corePool: 核心线程池的大小
- maximumPOol: 最大线程池的大小
- BlockingQueue: 用于暂时保存任务的工作队列
- RejectedExecutionHandler: 到达到了最大线程池大小且工作队列已满时,execute()将要调用的Handler
Executors可创建3种类型的ThreadPoolExecutor,如下:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
FixedThreadPool详解(可重用固定线程数的线程池)
1 | 源码 |
说明:当线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。将keepAliveTime设置为0L表示立即终止多余的空闲线程。
FixedThreadPool的execute()方法的运行示意图如下:
%E6%96%B9%E6%B3%95%E7%9A%84%E8%BF%90%E8%A1%8C%E7%A4%BA%E6%84%8F%E5%9B%BE.jpg)
从上图可以看出:
- 如果当前运行的线程数少于corePoolSize,则创建新线程执行任务;
- 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue;
- 线程执行完1中的任务后,会在循环中反复从LinkdeBlockingQueue获取任务执行。
FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量默认为Integer.MAX_VALUE),使用无界队列会产生以下影响:
- 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池的线程数不会超过corePoolSize,只会等于。因此在保存任务的队列未超出时,并不会创建新线程来执行任务;
- 由于1,使用无界队列maximumPoolSize将是一个无效参数;
- 由于1和2,使用无界队列时keepAliveTime将是一个无效参数;
- 由于使用无界队列,运行中的FIxedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(即不会调用RejectedExecutionHandler.rejectedExecution()方法)
SingleThreadExecutor详解(使用单个worker线程的Executor)
1 | 源码 |
说明:SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。
SingleThreadExecutor的运行示意图如下图所示:
%E7%9A%84%E8%BF%90%E8%A1%8C%E7%A4%BA%E6%84%8F%E5%9B%BE.jpg)
从上图可以看出:
- 如果当前线程数少于corePoolSize,则创建一个新线程执行任务;
- 线程池完成预热之后,将任务加入LinkedBlockingQueue;
- 线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务并执行。
CachedThreadPool详解(根据需要创建新线程的线程池)
1 | public static ExecutorService newCachedThreadPool() { |
说明:这里maximumPoolSize被设置为Integer.MAX_VALUE,意味着maximumPoolSize是无界的,而CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,所以如果主线程提交任务的速度高于maximumPool中线程处理任务速度时,CachedThreadPool会不断创建新线程。极端情况下,其会创建过多线程而耗尽CPU和内存资源。
CachedThreadPool的execute()方法执行示意图如下图所示:
%E7%9A%84%E8%BF%90%E8%A1%8C%E7%A4%BA%E6%84%8F%E5%9B%BE.jpg)
从上图可以看出:
首先执行SynchronousQueue.offer方法。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll方法,那么主线程执行offer操作与空闲线程执行poll操作配对成功,主线程把任务交给空闲线程执行,execute方法执行完成;否则执行下面的步骤2。
当初始maximumPool为空,或者maximumPool中没有空闲线程时,将没有线程执行SynchronousQueue的poll方法。这种情况下步骤1将失败,此时CachedThreadPool会创建一个新线程执行任务,execute方法执行完成。
在步骤2中新创建的线程将任务执行完后,会执行SynchronousQueue的poll方法。这个pool操作会让空闲线程最多在SynchronousQueue中等待60s。如果60s内主线程提交了一个新任务(主线程执行步骤1),那么这个空线程将执行主线程提交的新任务;否则,这个空闲线程将停止。由于空闲60s的线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
SynchronousQueue是一个没有容量的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue将主线程提交的任务传递给空闲线程执行,该传递过程如下图所示:

ScheduledThreadPoolExecutor详解
ScheduledThreadPoolExecutor功能与Timer类似,但Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数数。
ScheduledThreadPoolExecutor运行机制

从图中可看出ScheduledThreadPoolExecutor执行可分为两大部分:
- 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法活着scheduledWithFixedDelay方法时,会向DelayQueue添加一个ScheduledFutureTask;
- 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor的实现
ScheduledFutureTask主要包含3个成员变量
- long型成员变量time,表示这个任务将要被执行的具体时间;
- long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号;
- long型成员变量period,表示任务执行的间隔周期。
FutureTask详解(异步计算的结果)
简介
FutureTask除了实现Future接口外,还实现了Runnable接口。因此FutureTask可以交给Executor执行,也可以调用线程直接执行(FutureTask.run())。FutureTask的3中状态
- 未启动
FutureTask.run()方法还未被执行前的状态 - 已启动
FutureTask.run()方法在被执行的过程中 - 已完成
FutureTask.run()方法执行完后正常结束或被取消(FutureTask.cancel())或抛异常结束
FutureTask的状态迁移示意图如下所示:

FutureTask的get和cancel方法的执行示意图如下:

FutureTask的使用
三种使用方法:
- 把FutureTask交给Executor执行;
- 通过ExecutorService.submit()方法返回一个FutureTask,然后执行FutureTask.get()或FutureTask.cancel()方法
- 单独使用FutureTask。
使用场景:
当一个线程需要等待另一个线程执行完某项任务后才能继续执行,此时可用FutureTask。
FutureTask的实现
基于AbstractQueuedSynchronizer(后文简称AQS)实现。
说明:java.util.concurent中很多可阻塞类(比如ReentrantLock)都是基于AQS实现的。AQS是一个同步框架,提供了通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK6中AQS被广泛使用,基于AQS实现的同步器有:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。
- 每个基于AQS实现的同步器都包含以下两种类型操作:
- 至少一个acquire操作,这个操作阻塞调用线程,除非AQS状态允许这个线程继续执行。(该操作被get方法调用)
- 至少一个release操作,这个操作改变AQS状态,改变后的状态允许一个或多个阻塞线程被解除阻塞。(该操作包含run()和cancel())
基于“复合优于继承”原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有共有方法的调用都会委托这个内部子类。
Sync实现了AQS的tryAcquireShared(int)和tryReleaseShared(int)方法,Sync通过这两个方法检查和更新同步状态。
FutureTask的设计示意图如下:
