第10章| Executor框架

Java线程既是工作单元,也是执行机制。jdk5后将工作单元和执行机制分离开来,工作单元包括Runnable和Callable,执行单元为Executor。

Executor框架简介

  1. Executor框架的两级调度模型

任务的两级调度模型

应用程序通过Executor框架控制上层的调度,下层的调度由操作系统内核控制!

  1. Executor框架的结构与成员
  • 3大框架结构

    • 任务:
      包括被执行任务需要被实现的接口(Runnable和Callable);
    • 任务的执行:
      任务执行机制的核心接口Executor,以及继承Executor的ExecutorService接口;Executor框架的两个关键类实现了ExecutorService接口,即ThreadPoolExecutor和ScheduledThreadPoolExecutor;
    • 异步计算的结果:
      Future和实现Future接口的FutureTask类。
      Executor框架的类与接口
      Executor框架的使用示意图
  • 主要框架成员

    • ThreadPoolExecutor:一般由工厂类Executors创建,有3种类型;

      • SingleThreadExecutors(适用于保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景
      • FixedThreadPool(适用于需要限制当前线程数的应用场景,比如负载较重的服务器
      • CachedThreadPool(大小无界的线程池,适用于执行很多短期异步任务的小程序,或负载较轻的服务器);
    • ScheduledThreadPoolExecutor: 一般由工厂类Executors创建,有2种类型;

      • ScheduledThreadPoolExecutor:包含若干个线程的ScheduledThreadPoolExecutor,适用于需要后台多个线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景
      • SingleThreadPoolExecutor:只包含一个线程的ScheduledThreadPoolExecutor,适用于单个后台线程执行周期任务,同时需要保证顺序执行多个任务的应用场景
    • Future接口:该接口与实现该接口的FutureTask类表示异步计算的结果

    • Runnable或Callable接口:区别Runnable不会返回结果,而Callable可以。除了可以自定义实现Callable接口的对象外,还可用工厂类Executors将Runnable包装为一个Callable

ThreadPoolExecutor详解

最核心类ThreadPoolExecutor,是线程池的实现类,主要由以下4个组件构成:

  • corePool: 核心线程池的大小
  • maximumPOol: 最大线程池的大小
  • BlockingQueue: 用于暂时保存任务的工作队列
  • RejectedExecutionHandler: 到达到了最大线程池大小且工作队列已满时,execute()将要调用的Handler

Executors可创建3种类型的ThreadPoolExecutor,如下:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

FixedThreadPool详解(可重用固定线程数的线程池)

1
2
3
4
5
6
源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

说明:当线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。将keepAliveTime设置为0L表示立即终止多余的空闲线程。

FixedThreadPool的execute()方法的运行示意图如下:

FixedThreadPool的execute()方法的运行示意图

从上图可以看出:

  1. 如果当前运行的线程数少于corePoolSize,则创建新线程执行任务;
  2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue;
  3. 线程执行完1中的任务后,会在循环中反复从LinkdeBlockingQueue获取任务执行。

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量默认为Integer.MAX_VALUE),使用无界队列会产生以下影响:

  1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池的线程数不会超过corePoolSize,只会等于。因此在保存任务的队列未超出时,并不会创建新线程来执行任务;
  2. 由于1,使用无界队列maximumPoolSize将是一个无效参数;
  3. 由于1和2,使用无界队列时keepAliveTime将是一个无效参数;
  4. 由于使用无界队列,运行中的FIxedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(即不会调用RejectedExecutionHandler.rejectedExecution()方法)

SingleThreadExecutor详解(使用单个worker线程的Executor)

1
2
3
4
5
6
7
源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

说明:SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。

SingleThreadExecutor的运行示意图如下图所示:

SingleThreadExecutor的execute()的运行示意图

从上图可以看出:

  1. 如果当前线程数少于corePoolSize,则创建一个新线程执行任务;
  2. 线程池完成预热之后,将任务加入LinkedBlockingQueue;
  3. 线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务并执行。

CachedThreadPool详解(根据需要创建新线程的线程池)

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

说明:这里maximumPoolSize被设置为Integer.MAX_VALUE,意味着maximumPoolSize是无界的,而CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,所以如果主线程提交任务的速度高于maximumPool中线程处理任务速度时,CachedThreadPool会不断创建新线程。极端情况下,其会创建过多线程而耗尽CPU和内存资源。

CachedThreadPool的execute()方法执行示意图如下图所示:

CachedThreadPool的execute()的运行示意图

从上图可以看出:

  1. 首先执行SynchronousQueue.offer方法。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll方法,那么主线程执行offer操作与空闲线程执行poll操作配对成功,主线程把任务交给空闲线程执行,execute方法执行完成;否则执行下面的步骤2。

  2. 当初始maximumPool为空,或者maximumPool中没有空闲线程时,将没有线程执行SynchronousQueue的poll方法。这种情况下步骤1将失败,此时CachedThreadPool会创建一个新线程执行任务,execute方法执行完成。

  3. 在步骤2中新创建的线程将任务执行完后,会执行SynchronousQueue的poll方法。这个pool操作会让空闲线程最多在SynchronousQueue中等待60s。如果60s内主线程提交了一个新任务(主线程执行步骤1),那么这个空线程将执行主线程提交的新任务;否则,这个空闲线程将停止。由于空闲60s的线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

SynchronousQueue是一个没有容量的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue将主线程提交的任务传递给空闲线程执行,该传递过程如下图所示:

CachedThreadPool的任务传递示意图

ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor功能与Timer类似,但Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数数。

ScheduledThreadPoolExecutor运行机制

ScheduledThreadPoolExecutor的任务传递示意图(基于JDK6)

从图中可看出ScheduledThreadPoolExecutor执行可分为两大部分:

  1. 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法活着scheduledWithFixedDelay方法时,会向DelayQueue添加一个ScheduledFutureTask;
  2. 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor的实现

ScheduledFutureTask主要包含3个成员变量

  1. long型成员变量time,表示这个任务将要被执行的具体时间;
  2. long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号;
  3. long型成员变量period,表示任务执行的间隔周期。

FutureTask详解(异步计算的结果)

  1. 简介
    FutureTask除了实现Future接口外,还实现了Runnable接口。因此FutureTask可以交给Executor执行,也可以调用线程直接执行(FutureTask.run())。

  2. FutureTask的3中状态

  • 未启动
    FutureTask.run()方法还未被执行前的状态
  • 已启动
    FutureTask.run()方法在被执行的过程中
  • 已完成
    FutureTask.run()方法执行完后正常结束或被取消(FutureTask.cancel())或抛异常结束

FutureTask的状态迁移示意图如下所示:

FutureTask的状态迁移示意图

FutureTask的get和cancel方法的执行示意图如下:

FutureTask的get和cancel的执行示意图

FutureTask的使用

三种使用方法:

  1. 把FutureTask交给Executor执行;
  2. 通过ExecutorService.submit()方法返回一个FutureTask,然后执行FutureTask.get()或FutureTask.cancel()方法
  3. 单独使用FutureTask。

使用场景:
当一个线程需要等待另一个线程执行完某项任务后才能继续执行,此时可用FutureTask。

FutureTask的实现

基于AbstractQueuedSynchronizer(后文简称AQS)实现。

说明:java.util.concurent中很多可阻塞类(比如ReentrantLock)都是基于AQS实现的。AQS是一个同步框架,提供了通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK6中AQS被广泛使用,基于AQS实现的同步器有:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。

  1. 每个基于AQS实现的同步器都包含以下两种类型操作:
  • 至少一个acquire操作,这个操作阻塞调用线程,除非AQS状态允许这个线程继续执行。(该操作被get方法调用)
  • 至少一个release操作,这个操作改变AQS状态,改变后的状态允许一个或多个阻塞线程被解除阻塞。(该操作包含run()和cancel())
  1. 基于“复合优于继承”原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有共有方法的调用都会委托这个内部子类。

  2. Sync实现了AQS的tryAcquireShared(int)和tryReleaseShared(int)方法,Sync通过这两个方法检查和更新同步状态。

FutureTask的设计示意图如下:

FutureTask的设计示意图

码哥 wechat
欢迎关注个人订阅号:「码上行动GO」