第9章 Java并发包中ScheduledThreadPoolExecutor原理探究

    ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。

    线程池队列是DelayedWorkQueue,与DelayedQueue一样属于延迟队列。

    ScheduledFuturetask是具有返回值的任务,继承自FutureTask。FutureTask内部用一个变量state来表示任务的状态,一开始为NEW。

    各状态意义如下:

    • period=0,说明当前任务是一次性的,执行完毕后就推出了。
    • period为负数,说明当前任务为固定延迟的定时可重复执行任务(执行完一次后会停止指定时间后再次运行,若每次执行任务耗时不同,则显然相邻两次任务执行间隔不同)。
    • period为正数,说明当前任务为固定频率的定尺可重复执行任务(也即固定周期)。

    以下为ScheduledThreadPoolExecutor的构造函数:

    1. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    2. new DelayedWorkQueue());
    3. }
    4. // 指定了线程工厂
    5. public ScheduledThreadPoolExecutor(int corePoolSize,
    6. ThreadFactory threadFactory) {
    7. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    8. new DelayedWorkQueue(), threadFactory);
    9. }
    10. // 指定了拒绝策略
    11. public ScheduledThreadPoolExecutor(int corePoolSize,
    12. RejectedExecutionHandler handler) {
    13. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    14. new DelayedWorkQueue(), handler);
    15. }
    16. // 指定了线程工厂和拒绝策略
    17. public ScheduledThreadPoolExecutor(int corePoolSize,
    18. ThreadFactory threadFactory,
    19. RejectedExecutionHandler handler) {
    20. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    21. new DelayedWorkQueue(), threadFactory, handler);
    22. }

    从上面的代码中可以看到,ScheduledThreadPoolExecutor的线程池队列为DelayedWorkQueue。

    提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay后开始执行。提交的任务不是周期性任务,任务只会执行一次。

    以下是ScheduledFutureTask的相关代码:

    1. ScheduledFutureTask(Runnable r, V result, long ns) {
    2. // 调用父类构造函数
    3. super(r, result);
    4. this.period = 0; // period=0说明为一次性任务
    5. // 记录任务编号
    6. this.sequenceNumber = sequencer.getAndIncrement();
    7. }
    8. // 将Runnable任务转化成Callable任务
    9. this.callable = Executors.callable(runnable, result);
    10. this.state = NEW; // ensure visibility of callable
    11. }

    delayedExecute的代码如下:

    1. // ScheduledFutureTask.run
    2. public void run() {
    3. boolean periodic = isPeriodic();
    4. // 判断是否需要取消任务
    5. if (!canRunInCurrentRunState(periodic))
    6. cancel(false);
    7. // 一次性任务
    8. else if (!periodic)
    9. // 调用FutureTask的run方法
    10. ScheduledFutureTask.super.run();
    11. // 周期性任务
    12. // runAndReset为FutureTask中的方法
    13. // 用于执行当前任务但不改变future的状态
    14. else if (ScheduledFutureTask.super.runAndReset()) {
    15. // 设置下次执行的时间
    16. setNextRunTime();
    17. // 默认情况下,outerTask = this就是当前对象
    18. reExecutePeriodic(outerTask);
    19. }
    20. }
    21. // 设置周期性任务下次执行时间
    22. private void setNextRunTime() {
    23. long p = period;
    24. // p > 0表示任务执行频率一定
    25. if (p > 0)
    26. // time为此次任务(已执行完毕)刚开始执行时的时间
    27. time += p;
    28. // 即此次任务完成后会等待-p时间再执行下次任务
    29. // 获取-p时间后的绝对时间
    30. time = triggerTime(-p);
    31. }
    32. // 周期性执行
    33. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    34. if (canRunInCurrentRunState(true)) {
    35. // 再次将task添加至任务队列中等待执行
    36. // 当轮到task执行时,又会在run中调用此方法
    37. // 再次将自身添加到任务队列中,从而达到周期性执行效果
    38. super.getQueue().add(task);
    39. if (!canRunInCurrentRunState(true) && remove(task))
    40. task.cancel(false);
    41. else
    42. ensurePrestart();
    43. }
    44. }

    当任务执行完毕后,让其延迟固定时间后再次运行。

    主要不同在于设置了period=-delay,其他代码与schedule相同,相应的代码会判断period的取值从而决定程序不同的行为。

    按固定频率周期性地执行任务。

    1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    2. long initialDelay,
    3. long period,
    4. TimeUnit unit) {
    5. if (command == null || unit == null)
    6. throw new NullPointerException();
    7. if (period <= 0)
    8. throw new IllegalArgumentException();
    9. // 关键在于此处第四个参数period > 0
    10. ScheduledFutureTask<Void> sft =
    11. new ScheduledFutureTask<Void>(command,
    12. null,
    13. triggerTime(initialDelay, unit),
    14. unit.toNanos(period));
    15. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    16. sft.outerTask = t;
    17. delayedExecute(t);
    18. }

    除了设置period的值大于0外,总体与scheduleWithFixedDelay类似,不再赘述。

    相关笔记: