第9章 Java并发包中ScheduledThreadPoolExecutor原理探究
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。
线程池队列是DelayedWorkQueue,与DelayedQueue一样属于延迟队列。
ScheduledFuturetask是具有返回值的任务,继承自FutureTask。FutureTask内部用一个变量state来表示任务的状态,一开始为NEW。
各状态意义如下:
- period=0,说明当前任务是一次性的,执行完毕后就推出了。
- period为负数,说明当前任务为固定延迟的定时可重复执行任务(执行完一次后会停止指定时间后再次运行,若每次执行任务耗时不同,则显然相邻两次任务执行间隔不同)。
- period为正数,说明当前任务为固定频率的定尺可重复执行任务(也即固定周期)。
以下为ScheduledThreadPoolExecutor的构造函数:
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
// 指定了线程工厂
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
// 指定了拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
// 指定了线程工厂和拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
从上面的代码中可以看到,ScheduledThreadPoolExecutor的线程池队列为DelayedWorkQueue。
提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay后开始执行。提交的任务不是周期性任务,任务只会执行一次。
以下是ScheduledFutureTask的相关代码:
ScheduledFutureTask(Runnable r, V result, long ns) {
// 调用父类构造函数
super(r, result);
this.period = 0; // period=0说明为一次性任务
// 记录任务编号
this.sequenceNumber = sequencer.getAndIncrement();
}
// 将Runnable任务转化成Callable任务
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
delayedExecute的代码如下:
// ScheduledFutureTask.run
public void run() {
boolean periodic = isPeriodic();
// 判断是否需要取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 一次性任务
else if (!periodic)
// 调用FutureTask的run方法
ScheduledFutureTask.super.run();
// 周期性任务
// runAndReset为FutureTask中的方法
// 用于执行当前任务但不改变future的状态
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次执行的时间
setNextRunTime();
// 默认情况下,outerTask = this就是当前对象
reExecutePeriodic(outerTask);
}
}
// 设置周期性任务下次执行时间
private void setNextRunTime() {
long p = period;
// p > 0表示任务执行频率一定
if (p > 0)
// time为此次任务(已执行完毕)刚开始执行时的时间
time += p;
// 即此次任务完成后会等待-p时间再执行下次任务
// 获取-p时间后的绝对时间
time = triggerTime(-p);
}
// 周期性执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 再次将task添加至任务队列中等待执行
// 当轮到task执行时,又会在run中调用此方法
// 再次将自身添加到任务队列中,从而达到周期性执行效果
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
当任务执行完毕后,让其延迟固定时间后再次运行。
主要不同在于设置了period=-delay,其他代码与schedule相同,相应的代码会判断period的取值从而决定程序不同的行为。
按固定频率周期性地执行任务。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 关键在于此处第四个参数period > 0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
}
除了设置period的值大于0外,总体与scheduleWithFixedDelay类似,不再赘述。
相关笔记: