1 /*
2 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
3 *
4 *
5 *
6 *
7 *
8 *
9 *
10 *
11 *
12 *
13 *
14 *
15 *
16 *
17 *
18 *
19 *
20 *
21 *
22 *
23 */
24
25 /*
26 *
27 *
28 *
29 *
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37 import static java.util.concurrent.TimeUnit.NANOSECONDS;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.locks.ReentrantLock;
41 import java.util.*;
42
43 /**
44 * A {@link ThreadPoolExecutor} that can additionally schedule
45 * commands to run after a given delay, or to execute
46 * periodically. This class is preferable to {@link java.util.Timer}
47 * when multiple worker threads are needed, or when the additional
48 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
49 * this class extends) are required.
50 *
51 * <p>Delayed tasks execute no sooner than they are enabled, but
52 * without any real-time guarantees about when, after they are
53 * enabled, they will commence. Tasks scheduled for exactly the same
54 * execution time are enabled in first-in-first-out (FIFO) order of
55 * submission.
56 *
57 * <p>When a submitted task is cancelled before it is run, execution
58 * is suppressed. By default, such a cancelled task is not
59 * automatically removed from the work queue until its delay
60 * elapses. While this enables further inspection and monitoring, it
61 * may also cause unbounded retention of cancelled tasks. To avoid
62 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
63 * causes tasks to be immediately removed from the work queue at
64 * time of cancellation.
65 *
66 * <p>Successive executions of a task scheduled via
67 * {@code scheduleAtFixedRate} or
68 * {@code scheduleWithFixedDelay} do not overlap. While different
69 * executions may be performed by different threads, the effects of
70 * prior executions <a
71 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
72 * those of subsequent ones.
73 *
74 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
75 * of the inherited tuning methods are not useful for it. In
76 * particular, because it acts as a fixed-sized pool using
77 * {@code corePoolSize} threads and an unbounded queue, adjustments
78 * to {@code maximumPoolSize} have no useful effect. Additionally, it
79 * is almost never a good idea to set {@code corePoolSize} to zero or
80 * use {@code allowCoreThreadTimeOut} because this may leave the pool
81 * without threads to handle tasks once they become eligible to run.
82 *
83 * <p><b>Extension notes:</b> This class overrides the
84 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
85 * {@link AbstractExecutorService#submit(Runnable) submit}
86 * methods to generate internal {@link ScheduledFuture} objects to
87 * control per-task delays and scheduling. To preserve
88 * functionality, any further overrides of these methods in
89 * subclasses must invoke superclass versions, which effectively
90 * disables additional task customization. However, this class
91 * provides alternative protected extension method
92 * {@code decorateTask} (one version each for {@code Runnable} and
93 * {@code Callable}) that can be used to customize the concrete task
94 * types used to execute commands entered via {@code execute},
95 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
96 * and {@code scheduleWithFixedDelay}. By default, a
97 * {@code ScheduledThreadPoolExecutor} uses a task type extending
98 * {@link FutureTask}. However, this may be modified or replaced using
99 * subclasses of the form:
100 *
101 * <pre> {@code
102 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
103 *
104 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
105 *
106 * protected <V> RunnableScheduledFuture<V> decorateTask(
107 * Runnable r, RunnableScheduledFuture<V> task) {
108 * return new CustomTask<V>(r, task);
109 * }
110 *
111 * protected <V> RunnableScheduledFuture<V> decorateTask(
112 * Callable<V> c, RunnableScheduledFuture<V> task) {
113 * return new CustomTask<V>(c, task);
114 * }
115 * // ... add constructors, etc.
116 * }}</pre>
117 *
118 * @since 1.5
119 * @author Doug Lea
120 */
121 public class ScheduledThreadPoolExecutor
122 extends ThreadPoolExecutor
123 implements ScheduledExecutorService {
124
125 /*
126 * This class specializes ThreadPoolExecutor implementation by
127 *
128 * 1. Using a custom task type, ScheduledFutureTask for
129 * tasks, even those that don't require scheduling (i.e.,
130 * those submitted using ExecutorService execute, not
131 * ScheduledExecutorService methods) which are treated as
132 * delayed tasks with a delay of zero.
133 *
134 * 2. Using a custom queue (DelayedWorkQueue), a variant of
135 * unbounded DelayQueue. The lack of capacity constraint and
136 * the fact that corePoolSize and maximumPoolSize are
137 * effectively identical simplifies some execution mechanics
138 * (see delayedExecute) compared to ThreadPoolExecutor.
139 *
140 * 3. Supporting optional run-after-shutdown parameters, which
141 * leads to overrides of shutdown methods to remove and cancel
142 * tasks that should NOT be run after shutdown, as well as
143 * different recheck logic when task (re)submission overlaps
144 * with a shutdown.
145 *
146 * 4. Task decoration methods to allow interception and
147 * instrumentation, which are needed because subclasses cannot
148 * otherwise override submit methods to get this effect. These
149 * don't have any impact on pool control logic though.
150 */
151
152 /**
153 * False if should cancel/suppress periodic tasks on shutdown.
154 */
155 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
156
157 /**
158 * False if should cancel non-periodic tasks on shutdown.
159 */
160 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
161
162 /**
163 * True if ScheduledFutureTask.cancel should remove from queue
164 */
165 private volatile boolean removeOnCancel = false;
166
167 /**
168 * Sequence number to break scheduling ties, and in turn to
169 * guarantee FIFO order among tied entries.
170 */
171 private static final AtomicLong sequencer = new AtomicLong();
172
173 /**
174 * Returns current nanosecond time.
175 */
176 final long now() {
177 return System.nanoTime();
178 }
179
180 private class ScheduledFutureTask<V>
181 extends FutureTask<V> implements RunnableScheduledFuture<V> {
182
183 /** Sequence number to break ties FIFO */
184 private final long sequenceNumber;
185
186 /** The time the task is enabled to execute in nanoTime units */
187 private long time;
188
189 /**
190 * Period in nanoseconds for repeating tasks. A positive
191 * value indicates fixed-rate execution. A negative value
192 * indicates fixed-delay execution. A value of 0 indicates a
193 * non-repeating task.
194 */
195 private final long period;
196
197 /** The actual task to be re-enqueued by reExecutePeriodic */
198 RunnableScheduledFuture<V> outerTask = this;
199
200 /**
201 * Index into delay queue, to support faster cancellation.
202 */
203 int heapIndex;
204
205 /**
206 * Creates a one-shot action with given nanoTime-based trigger time.
207 */
208 ScheduledFutureTask(Runnable r, V result, long ns) {
209 super(r, result);
210 this.time = ns;
211 this.period = 0;
212 this.sequenceNumber = sequencer.getAndIncrement();
213 }
214
215 /**
216 * Creates a periodic action with given nano time and period.
217 */
218 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
219 super(r, result);
220 this.time = ns;
221 this.period = period;
222 this.sequenceNumber = sequencer.getAndIncrement();
223 }
224
225 /**
226 * Creates a one-shot action with given nanoTime-based trigger time.
227 */
228 ScheduledFutureTask(Callable<V> callable, long ns) {
229 super(callable);
230 this.time = ns;
231 this.period = 0;
232 this.sequenceNumber = sequencer.getAndIncrement();
233 }
234
235 public long getDelay(TimeUnit unit) {
236 return unit.convert(time - now(), NANOSECONDS);
237 }
238
239 public int compareTo(Delayed other) {
240 if (other == this) // compare zero if same object
241 return 0;
242 if (other instanceof ScheduledFutureTask) {
243 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
244 long diff = time - x.time;
245 if (diff < 0)
246 return -1;
247 else if (diff > 0)
248 return 1;
249 else if (sequenceNumber < x.sequenceNumber)
250 return -1;
251 else
252 return 1;
253 }
254 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
255 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
256 }
257
258 /**
259 * Returns {@code true} if this is a periodic (not a one-shot) action.
260 *
261 * @return {@code true} if periodic
262 */
263 public boolean isPeriodic() {
264 return period != 0;
265 }
266
267 /**
268 * Sets the next time to run for a periodic task.
269 */
270 private void setNextRunTime() {
271 long p = period;
272 if (p > 0)
273 time += p;
274 else
275 time = triggerTime(-p);
276 }
277
278 public boolean cancel(boolean mayInterruptIfRunning) {
279 boolean cancelled = super.cancel(mayInterruptIfRunning);
280 if (cancelled && removeOnCancel && heapIndex >= 0)
281 remove(this);
282 return cancelled;
283 }
284
285 /**
286 * Overrides FutureTask version so as to reset/requeue if periodic.
287 */
288 public void run() {
289 boolean periodic = isPeriodic();
290 if (!canRunInCurrentRunState(periodic))
291 cancel(false);
292 else if (!periodic)
293 ScheduledFutureTask.super.run();
294 else if (ScheduledFutureTask.super.runAndReset()) {
295 setNextRunTime();
296 reExecutePeriodic(outerTask);
297 }
298 }
299 }
300
301 /**
302 * Returns true if can run a task given current run state
303 * and run-after-shutdown parameters.
304 *
305 * @param periodic true if this task periodic, false if delayed
306 */
307 boolean canRunInCurrentRunState(boolean periodic) {
308 return isRunningOrShutdown(periodic ?
309 continueExistingPeriodicTasksAfterShutdown :
310 executeExistingDelayedTasksAfterShutdown);
311 }
312
313 /**
314 * Main execution method for delayed or periodic tasks. If pool
315 * is shut down, rejects the task. Otherwise adds task to queue
316 * and starts a thread, if necessary, to run it. (We cannot
317 * prestart the thread to run the task because the task (probably)
318 * shouldn't be run yet.) If the pool is shut down while the task
319 * is being added, cancel and remove it if required by state and
320 * run-after-shutdown parameters.
321 *
322 * @param task the task
323 */
324 private void delayedExecute(RunnableScheduledFuture<?> task) {
325 if (isShutdown())
326 reject(task);
327 else {
328 super.getQueue().add(task);
329 if (isShutdown() &&
330 !canRunInCurrentRunState(task.isPeriodic()) &&
331 remove(task))
332 task.cancel(false);
333 else
334 ensurePrestart();
335 }
336 }
337
338 /**
339 * Requeues a periodic task unless current run state precludes it.
340 * Same idea as delayedExecute except drops task rather than rejecting.
341 *
342 * @param task the task
343 */
344 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
345 if (canRunInCurrentRunState(true)) {
346 super.getQueue().add(task);
347 if (!canRunInCurrentRunState(true) && remove(task))
348 task.cancel(false);
349 else
350 ensurePrestart();
351 }
352 }
353
354 /**
355 * Cancels and clears the queue of all tasks that should not be run
356 * due to shutdown policy. Invoked within super.shutdown.
357 */
358 @Override void onShutdown() {
359 BlockingQueue<Runnable> q = super.getQueue();
360 boolean keepDelayed =
361 getExecuteExistingDelayedTasksAfterShutdownPolicy();
362 boolean keepPeriodic =
363 getContinueExistingPeriodicTasksAfterShutdownPolicy();
364 if (!keepDelayed && !keepPeriodic) {
365 for (Object e : q.toArray())
366 if (e instanceof RunnableScheduledFuture<?>)
367 ((RunnableScheduledFuture<?>) e).cancel(false);
368 q.clear();
369 }
370 else {
371 // Traverse snapshot to avoid iterator exceptions
372 for (Object e : q.toArray()) {
373 if (e instanceof RunnableScheduledFuture) {
374 RunnableScheduledFuture<?> t =
375 (RunnableScheduledFuture<?>)e;
376 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
377 t.isCancelled()) { // also remove if already cancelled
378 if (q.remove(t))
379 t.cancel(false);
380 }
381 }
382 }
383 }
384 tryTerminate();
385 }
386
387 /**
388 * Modifies or replaces the task used to execute a runnable.
389 * This method can be used to override the concrete
390 * class used for managing internal tasks.
391 * The default implementation simply returns the given task.
392 *
393 * @param runnable the submitted Runnable
394 * @param task the task created to execute the runnable
395 * @param <V> the type of the task's result
396 * @return a task that can execute the runnable
397 * @since 1.6
398 */
399 protected <V> RunnableScheduledFuture<V> decorateTask(
400 Runnable runnable, RunnableScheduledFuture<V> task) {
401 return task;
402 }
403
404 /**
405 * Modifies or replaces the task used to execute a callable.
406 * This method can be used to override the concrete
407 * class used for managing internal tasks.
408 * The default implementation simply returns the given task.
409 *
410 * @param callable the submitted Callable
411 * @param task the task created to execute the callable
412 * @param <V> the type of the task's result
413 * @return a task that can execute the callable
414 * @since 1.6
415 */
416 protected <V> RunnableScheduledFuture<V> decorateTask(
417 Callable<V> callable, RunnableScheduledFuture<V> task) {
418 return task;
419 }
420
421 /**
422 * Creates a new {@code ScheduledThreadPoolExecutor} with the
423 * given core pool size.
424 *
425 * @param corePoolSize the number of threads to keep in the pool, even
426 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
427 * @throws IllegalArgumentException if {@code corePoolSize < 0}
428 */
429 public ScheduledThreadPoolExecutor(int corePoolSize) {
430 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
431 new DelayedWorkQueue());
432 }
433
434 /**
435 * Creates a new {@code ScheduledThreadPoolExecutor} with the
436 * given initial parameters.
437 *
438 * @param corePoolSize the number of threads to keep in the pool, even
439 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
440 * @param threadFactory the factory to use when the executor
441 * creates a new thread
442 * @throws IllegalArgumentException if {@code corePoolSize < 0}
443 * @throws NullPointerException if {@code threadFactory} is null
444 */
445 public ScheduledThreadPoolExecutor(int corePoolSize,
446 ThreadFactory threadFactory) {
447 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
448 new DelayedWorkQueue(), threadFactory);
449 }
450
451 /**
452 * Creates a new ScheduledThreadPoolExecutor with the given
453 * initial parameters.
454 *
455 * @param corePoolSize the number of threads to keep in the pool, even
456 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
457 * @param handler the handler to use when execution is blocked
458 * because the thread bounds and queue capacities are reached
459 * @throws IllegalArgumentException if {@code corePoolSize < 0}
460 * @throws NullPointerException if {@code handler} is null
461 */
462 public ScheduledThreadPoolExecutor(int corePoolSize,
463 RejectedExecutionHandler handler) {
464 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
465 new DelayedWorkQueue(), handler);
466 }
467
468 /**
469 * Creates a new ScheduledThreadPoolExecutor with the given
470 * initial parameters.
471 *
472 * @param corePoolSize the number of threads to keep in the pool, even
473 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
474 * @param threadFactory the factory to use when the executor
475 * creates a new thread
476 * @param handler the handler to use when execution is blocked
477 * because the thread bounds and queue capacities are reached
478 * @throws IllegalArgumentException if {@code corePoolSize < 0}
479 * @throws NullPointerException if {@code threadFactory} or
480 * {@code handler} is null
481 */
482 public ScheduledThreadPoolExecutor(int corePoolSize,
483 ThreadFactory threadFactory,
484 RejectedExecutionHandler handler) {
485 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
486 new DelayedWorkQueue(), threadFactory, handler);
487 }
488
489 /**
490 * Returns the trigger time of a delayed action.
491 */
492 private long triggerTime(long delay, TimeUnit unit) {
493 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
494 }
495
496 /**
497 * Returns the trigger time of a delayed action.
498 */
499 long triggerTime(long delay) {
500 return now() +
501 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
502 }
503
504 /**
505 * Constrains the values of all delays in the queue to be within
506 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
507 * This may occur if a task is eligible to be dequeued, but has
508 * not yet been, while some other task is added with a delay of
509 * Long.MAX_VALUE.
510 */
511 private long overflowFree(long delay) {
512 Delayed head = (Delayed) super.getQueue().peek();
513 if (head != null) {
514 long headDelay = head.getDelay(NANOSECONDS);
515 if (headDelay < 0 && (delay - headDelay < 0))
516 delay = Long.MAX_VALUE + headDelay;
517 }
518 return delay;
519 }
520
521 /**
522 * @throws RejectedExecutionException {@inheritDoc}
523 * @throws NullPointerException {@inheritDoc}
524 */
525 public ScheduledFuture<?> schedule(Runnable command,
526 long delay,
527 TimeUnit unit) {
528 if (command == null || unit == null)
529 throw new NullPointerException();
530 RunnableScheduledFuture<?> t = decorateTask(command,
531 new ScheduledFutureTask<Void>(command, null,
532 triggerTime(delay, unit)));
533 delayedExecute(t);
534 return t;
535 }
536
537 /**
538 * @throws RejectedExecutionException {@inheritDoc}
539 * @throws NullPointerException {@inheritDoc}
540 */
541 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542 long delay,
543 TimeUnit unit) {
544 if (callable == null || unit == null)
545 throw new NullPointerException();
546 RunnableScheduledFuture<V> t = decorateTask(callable,
547 new ScheduledFutureTask<V>(callable,
548 triggerTime(delay, unit)));
549 delayedExecute(t);
550 return t;
551 }
552
553 /**
554 * @throws RejectedExecutionException {@inheritDoc}
555 * @throws NullPointerException {@inheritDoc}
556 * @throws IllegalArgumentException {@inheritDoc}
557 */
558 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
559 long initialDelay,
560 long period,
561 TimeUnit unit) {
562 if (command == null || unit == null)
563 throw new NullPointerException();
564 if (period <= 0)
565 throw new IllegalArgumentException();
566 ScheduledFutureTask<Void> sft =
567 new ScheduledFutureTask<Void>(command,
568 null,
569 triggerTime(initialDelay, unit),
570 unit.toNanos(period));
571 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
572 sft.outerTask = t;
573 delayedExecute(t);
574 return t;
575 }
576
577 /**
578 * @throws RejectedExecutionException {@inheritDoc}
579 * @throws NullPointerException {@inheritDoc}
580 * @throws IllegalArgumentException {@inheritDoc}
581 */
582 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
583 long initialDelay,
584 long delay,
585 TimeUnit unit) {
586 if (command == null || unit == null)
587 throw new NullPointerException();
588 if (delay <= 0)
589 throw new IllegalArgumentException();
590 ScheduledFutureTask<Void> sft =
591 new ScheduledFutureTask<Void>(command,
592 null,
593 triggerTime(initialDelay, unit),
594 unit.toNanos(-delay));
595 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
596 sft.outerTask = t;
597 delayedExecute(t);
598 return t;
599 }
600
601 /**
602 * Executes {@code command} with zero required delay.
603 * This has effect equivalent to
604 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
605 * Note that inspections of the queue and of the list returned by
606 * {@code shutdownNow} will access the zero-delayed
607 * {@link ScheduledFuture}, not the {@code command} itself.
608 *
609 * <p>A consequence of the use of {@code ScheduledFuture} objects is
610 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
611 * called with a null second {@code Throwable} argument, even if the
612 * {@code command} terminated abruptly. Instead, the {@code Throwable}
613 * thrown by such a task can be obtained via {@link Future#get}.
614 *
615 * @throws RejectedExecutionException at discretion of
616 * {@code RejectedExecutionHandler}, if the task
617 * cannot be accepted for execution because the
618 * executor has been shut down
619 * @throws NullPointerException {@inheritDoc}
620 */
621 public void execute(Runnable command) {
622 schedule(command, 0, NANOSECONDS);
623 }
624
625 // Override AbstractExecutorService methods
626
627 /**
628 * @throws RejectedExecutionException {@inheritDoc}
629 * @throws NullPointerException {@inheritDoc}
630 */
631 public Future<?> submit(Runnable task) {
632 return schedule(task, 0, NANOSECONDS);
633 }
634
635 /**
636 * @throws RejectedExecutionException {@inheritDoc}
637 * @throws NullPointerException {@inheritDoc}
638 */
639 public <T> Future<T> submit(Runnable task, T result) {
640 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
641 }
642
643 /**
644 * @throws RejectedExecutionException {@inheritDoc}
645 * @throws NullPointerException {@inheritDoc}
646 */
647 public <T> Future<T> submit(Callable<T> task) {
648 return schedule(task, 0, NANOSECONDS);
649 }
650
651 /**
652 * Sets the policy on whether to continue executing existing
653 * periodic tasks even when this executor has been {@code shutdown}.
654 * In this case, these tasks will only terminate upon
655 * {@code shutdownNow} or after setting the policy to
656 * {@code false} when already shutdown.
657 * This value is by default {@code false}.
658 *
659 * @param value if {@code true}, continue after shutdown, else don't
660 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
661 */
662 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
663 continueExistingPeriodicTasksAfterShutdown = value;
664 if (!value && isShutdown())
665 onShutdown();
666 }
667
668 /**
669 * Gets the policy on whether to continue executing existing
670 * periodic tasks even when this executor has been {@code shutdown}.
671 * In this case, these tasks will only terminate upon
672 * {@code shutdownNow} or after setting the policy to
673 * {@code false} when already shutdown.
674 * This value is by default {@code false}.
675 *
676 * @return {@code true} if will continue after shutdown
677 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
678 */
679 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
680 return continueExistingPeriodicTasksAfterShutdown;
681 }
682
683 /**
684 * Sets the policy on whether to execute existing delayed
685 * tasks even when this executor has been {@code shutdown}.
686 * In this case, these tasks will only terminate upon
687 * {@code shutdownNow}, or after setting the policy to
688 * {@code false} when already shutdown.
689 * This value is by default {@code true}.
690 *
691 * @param value if {@code true}, execute after shutdown, else don't
692 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
693 */
694 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
695 executeExistingDelayedTasksAfterShutdown = value;
696 if (!value && isShutdown())
697 onShutdown();
698 }
699
700 /**
701 * Gets the policy on whether to execute existing delayed
702 * tasks even when this executor has been {@code shutdown}.
703 * In this case, these tasks will only terminate upon
704 * {@code shutdownNow}, or after setting the policy to
705 * {@code false} when already shutdown.
706 * This value is by default {@code true}.
707 *
708 * @return {@code true} if will execute after shutdown
709 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
710 */
711 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
712 return executeExistingDelayedTasksAfterShutdown;
713 }
714
715 /**
716 * Sets the policy on whether cancelled tasks should be immediately
717 * removed from the work queue at time of cancellation. This value is
718 * by default {@code false}.
719 *
720 * @param value if {@code true}, remove on cancellation, else don't
721 * @see #getRemoveOnCancelPolicy
722 * @since 1.7
723 */
724 public void setRemoveOnCancelPolicy(boolean value) {
725 removeOnCancel = value;
726 }
727
728 /**
729 * Gets the policy on whether cancelled tasks should be immediately
730 * removed from the work queue at time of cancellation. This value is
731 * by default {@code false}.
732 *
733 * @return {@code true} if cancelled tasks are immediately removed
734 * from the queue
735 * @see #setRemoveOnCancelPolicy
736 * @since 1.7
737 */
738 public boolean getRemoveOnCancelPolicy() {
739 return removeOnCancel;
740 }
741
742 /**
743 * Initiates an orderly shutdown in which previously submitted
744 * tasks are executed, but no new tasks will be accepted.
745 * Invocation has no additional effect if already shut down.
746 *
747 * <p>This method does not wait for previously submitted tasks to
748 * complete execution. Use {@link #awaitTermination awaitTermination}
749 * to do that.
750 *
751 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
752 * has been set {@code false}, existing delayed tasks whose delays
753 * have not yet elapsed are cancelled. And unless the {@code
754 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
755 * {@code true}, future executions of existing periodic tasks will
756 * be cancelled.
757 *
758 * @throws SecurityException {@inheritDoc}
759 */
760 public void shutdown() {
761 super.shutdown();
762 }
763
764 /**
765 * Attempts to stop all actively executing tasks, halts the
766 * processing of waiting tasks, and returns a list of the tasks
767 * that were awaiting execution.
768 *
769 * <p>This method does not wait for actively executing tasks to
770 * terminate. Use {@link #awaitTermination awaitTermination} to
771 * do that.
772 *
773 * <p>There are no guarantees beyond best-effort attempts to stop
774 * processing actively executing tasks. This implementation
775 * cancels tasks via {@link Thread#interrupt}, so any task that
776 * fails to respond to interrupts may never terminate.
777 *
778 * @return list of tasks that never commenced execution.
779 * Each element of this list is a {@link ScheduledFuture},
780 * including those tasks submitted using {@code execute},
781 * which are for scheduling purposes used as the basis of a
782 * zero-delay {@code ScheduledFuture}.
783 * @throws SecurityException {@inheritDoc}
784 */
785 public List<Runnable> shutdownNow() {
786 return super.shutdownNow();
787 }
788
789 /**
790 * Returns the task queue used by this executor. Each element of
791 * this queue is a {@link ScheduledFuture}, including those
792 * tasks submitted using {@code execute} which are for scheduling
793 * purposes used as the basis of a zero-delay
794 * {@code ScheduledFuture}. Iteration over this queue is
795 * <em>not</em> guaranteed to traverse tasks in the order in
796 * which they will execute.
797 *
798 * @return the task queue
799 */
800 public BlockingQueue<Runnable> getQueue() {
801 return super.getQueue();
802 }
803
804 /**
805 * Specialized delay queue. To mesh with TPE declarations, this
806 * class must be declared as a BlockingQueue<Runnable> even though
807 * it can only hold RunnableScheduledFutures.
808 */
809 static class DelayedWorkQueue extends AbstractQueue<Runnable>
810 implements BlockingQueue<Runnable> {
811
812 /*
813 * A DelayedWorkQueue is based on a heap-based data structure
814 * like those in DelayQueue and PriorityQueue, except that
815 * every ScheduledFutureTask also records its index into the
816 * heap array. This eliminates the need to find a task upon
817 * cancellation, greatly speeding up removal (down from O(n)
818 * to O(log n)), and reducing garbage retention that would
819 * otherwise occur by waiting for the element to rise to top
820 * before clearing. But because the queue may also hold
821 * RunnableScheduledFutures that are not ScheduledFutureTasks,
822 * we are not guaranteed to have such indices available, in
823 * which case we fall back to linear search. (We expect that
824 * most tasks will not be decorated, and that the faster cases
825 * will be much more common.)
826 *
827 * All heap operations must record index changes -- mainly
828 * within siftUp and siftDown. Upon removal, a task's
829 * heapIndex is set to -1. Note that ScheduledFutureTasks can
830 * appear at most once in the queue (this need not be true for
831 * other kinds of tasks or work queues), so are uniquely
832 * identified by heapIndex.
833 */
834
835 private static final int INITIAL_CAPACITY = 16;
836 private RunnableScheduledFuture<?>[] queue =
837 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
838 private final ReentrantLock lock = new ReentrantLock();
839 private int size = 0;
840
841 /**
842 * Thread designated to wait for the task at the head of the
843 * queue. This variant of the Leader-Follower pattern
844 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
845 * minimize unnecessary timed waiting. When a thread becomes
846 * the leader, it waits only for the next delay to elapse, but
847 * other threads await indefinitely. The leader thread must
848 * signal some other thread before returning from take() or
849 * poll(...), unless some other thread becomes leader in the
850 * interim. Whenever the head of the queue is replaced with a
851 * task with an earlier expiration time, the leader field is
852 * invalidated by being reset to null, and some waiting
853 * thread, but not necessarily the current leader, is
854 * signalled. So waiting threads must be prepared to acquire
855 * and lose leadership while waiting.
856 */
857 private Thread leader = null;
858
859 /**
860 * Condition signalled when a newer task becomes available at the
861 * head of the queue or a new thread may need to become leader.
862 */
863 private final Condition available = lock.newCondition();
864
865 /**
866 * Sets f's heapIndex if it is a ScheduledFutureTask.
867 */
868 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
869 if (f instanceof ScheduledFutureTask)
870 ((ScheduledFutureTask)f).heapIndex = idx;
871 }
872
873 /**
874 * Sifts element added at bottom up to its heap-ordered spot.
875 * Call only when holding lock.
876 */
877 private void siftUp(int k, RunnableScheduledFuture<?> key) {
878 while (k > 0) {
879 int parent = (k - 1) >>> 1;
880 RunnableScheduledFuture<?> e = queue[parent];
881 if (key.compareTo(e) >= 0)
882 break;
883 queue[k] = e;
884 setIndex(e, k);
885 k = parent;
886 }
887 queue[k] = key;
888 setIndex(key, k);
889 }
890
891 /**
892 * Sifts element added at top down to its heap-ordered spot.
893 * Call only when holding lock.
894 */
895 private void siftDown(int k, RunnableScheduledFuture<?> key) {
896 int half = size >>> 1;
897 while (k < half) {
898 int child = (k << 1) + 1;
899 RunnableScheduledFuture<?> c = queue[child];
900 int right = child + 1;
901 if (right < size && c.compareTo(queue[right]) > 0)
902 c = queue[child = right];
903 if (key.compareTo(c) <= 0)
904 break;
905 queue[k] = c;
906 setIndex(c, k);
907 k = child;
908 }
909 queue[k] = key;
910 setIndex(key, k);
911 }
912
913 /**
914 * Resizes the heap array. Call only when holding lock.
915 */
916 private void grow() {
917 int oldCapacity = queue.length;
918 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
919 if (newCapacity < 0) // overflow
920 newCapacity = Integer.MAX_VALUE;
921 queue = Arrays.copyOf(queue, newCapacity);
922 }
923
924 /**
925 * Finds index of given object, or -1 if absent.
926 */
927 private int indexOf(Object x) {
928 if (x != null) {
929 if (x instanceof ScheduledFutureTask) {
930 int i = ((ScheduledFutureTask) x).heapIndex;
931 // Sanity check; x could conceivably be a
932 // ScheduledFutureTask from some other pool.
933 if (i >= 0 && i < size && queue[i] == x)
934 return i;
935 } else {
936 for (int i = 0; i < size; i++)
937 if (x.equals(queue[i]))
938 return i;
939 }
940 }
941 return -1;
942 }
943
944 public boolean contains(Object x) {
945 final ReentrantLock lock = this.lock;
946 lock.lock();
947 try {
948 return indexOf(x) != -1;
949 } finally {
950 lock.unlock();
951 }
952 }
953
954 public boolean remove(Object x) {
955 final ReentrantLock lock = this.lock;
956 lock.lock();
957 try {
958 int i = indexOf(x);
959 if (i < 0)
960 return false;
961
962 setIndex(queue[i], -1);
963 int s = --size;
964 RunnableScheduledFuture<?> replacement = queue[s];
965 queue[s] = null;
966 if (s != i) {
967 siftDown(i, replacement);
968 if (queue[i] == replacement)
969 siftUp(i, replacement);
970 }
971 return true;
972 } finally {
973 lock.unlock();
974 }
975 }
976
977 public int size() {
978 final ReentrantLock lock = this.lock;
979 lock.lock();
980 try {
981 return size;
982 } finally {
983 lock.unlock();
984 }
985 }
986
987 public boolean isEmpty() {
988 return size() == 0;
989 }
990
991 public int remainingCapacity() {
992 return Integer.MAX_VALUE;
993 }
994
995 public RunnableScheduledFuture<?> peek() {
996 final ReentrantLock lock = this.lock;
997 lock.lock();
998 try {
999 return queue[0];
1000 } finally {
1001 lock.unlock();
1002 }
1003 }
1004
1005 public boolean offer(Runnable x) {
1006 if (x == null)
1007 throw new NullPointerException();
1008 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1009 final ReentrantLock lock = this.lock;
1010 lock.lock();
1011 try {
1012 int i = size;
1013 if (i >= queue.length)
1014 grow();
1015 size = i + 1;
1016 if (i == 0) {
1017 queue[0] = e;
1018 setIndex(e, 0);
1019 } else {
1020 siftUp(i, e);
1021 }
1022 if (queue[0] == e) {
1023 leader = null;
1024 available.signal();
1025 }
1026 } finally {
1027 lock.unlock();
1028 }
1029 return true;
1030 }
1031
1032 public void put(Runnable e) {
1033 offer(e);
1034 }
1035
1036 public boolean add(Runnable e) {
1037 return offer(e);
1038 }
1039
1040 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1041 return offer(e);
1042 }
1043
1044 /**
1045 * Performs common bookkeeping for poll and take: Replaces
1046 * first element with last and sifts it down. Call only when
1047 * holding lock.
1048 * @param f the task to remove and return
1049 */
1050 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1051 int s = --size;
1052 RunnableScheduledFuture<?> x = queue[s];
1053 queue[s] = null;
1054 if (s != 0)
1055 siftDown(0, x);
1056 setIndex(f, -1);
1057 return f;
1058 }
1059
1060 public RunnableScheduledFuture<?> poll() {
1061 final ReentrantLock lock = this.lock;
1062 lock.lock();
1063 try {
1064 RunnableScheduledFuture<?> first = queue[0];
1065 if (first == null || first.getDelay(NANOSECONDS) > 0)
1066 return null;
1067 else
1068 return finishPoll(first);
1069 } finally {
1070 lock.unlock();
1071 }
1072 }
1073
1074 public RunnableScheduledFuture<?> take() throws InterruptedException {
1075 final ReentrantLock lock = this.lock;
1076 lock.lockInterruptibly();
1077 try {
1078 for (;;) {
1079 RunnableScheduledFuture<?> first = queue[0];
1080 if (first == null)
1081 available.await();
1082 else {
1083 long delay = first.getDelay(NANOSECONDS);
1084 if (delay <= 0)
1085 return finishPoll(first);
1086 first = null; // don't retain ref while waiting
1087 if (leader != null)
1088 available.await();
1089 else {
1090 Thread thisThread = Thread.currentThread();
1091 leader = thisThread;
1092 try {
1093 available.awaitNanos(delay);
1094 } finally {
1095 if (leader == thisThread)
1096 leader = null;
1097 }
1098 }
1099 }
1100 }
1101 } finally {
1102 if (leader == null && queue[0] != null)
1103 available.signal();
1104 lock.unlock();
1105 }
1106 }
1107
1108 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1109 throws InterruptedException {
1110 long nanos = unit.toNanos(timeout);
1111 final ReentrantLock lock = this.lock;
1112 lock.lockInterruptibly();
1113 try {
1114 for (;;) {
1115 RunnableScheduledFuture<?> first = queue[0];
1116 if (first == null) {
1117 if (nanos <= 0)
1118 return null;
1119 else
1120 nanos = available.awaitNanos(nanos);
1121 } else {
1122 long delay = first.getDelay(NANOSECONDS);
1123 if (delay <= 0)
1124 return finishPoll(first);
1125 if (nanos <= 0)
1126 return null;
1127 first = null; // don't retain ref while waiting
1128 if (nanos < delay || leader != null)
1129 nanos = available.awaitNanos(nanos);
1130 else {
1131 Thread thisThread = Thread.currentThread();
1132 leader = thisThread;
1133 try {
1134 long timeLeft = available.awaitNanos(delay);
1135 nanos -= delay - timeLeft;
1136 } finally {
1137 if (leader == thisThread)
1138 leader = null;
1139 }
1140 }
1141 }
1142 }
1143 } finally {
1144 if (leader == null && queue[0] != null)
1145 available.signal();
1146 lock.unlock();
1147 }
1148 }
1149
1150 public void clear() {
1151 final ReentrantLock lock = this.lock;
1152 lock.lock();
1153 try {
1154 for (int i = 0; i < size; i++) {
1155 RunnableScheduledFuture<?> t = queue[i];
1156 if (t != null) {
1157 queue[i] = null;
1158 setIndex(t, -1);
1159 }
1160 }
1161 size = 0;
1162 } finally {
1163 lock.unlock();
1164 }
1165 }
1166
1167 /**
1168 * Returns first element only if it is expired.
1169 * Used only by drainTo. Call only when holding lock.
1170 */
1171 private RunnableScheduledFuture<?> peekExpired() {
1172 // assert lock.isHeldByCurrentThread();
1173 RunnableScheduledFuture<?> first = queue[0];
1174 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1175 null : first;
1176 }
1177
1178 public int drainTo(Collection<? super Runnable> c) {
1179 if (c == null)
1180 throw new NullPointerException();
1181 if (c == this)
1182 throw new IllegalArgumentException();
1183 final ReentrantLock lock = this.lock;
1184 lock.lock();
1185 try {
1186 RunnableScheduledFuture<?> first;
1187 int n = 0;
1188 while ((first = peekExpired()) != null) {
1189 c.add(first); // In this order, in case add() throws.
1190 finishPoll(first);
1191 ++n;
1192 }
1193 return n;
1194 } finally {
1195 lock.unlock();
1196 }
1197 }
1198
1199 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1200 if (c == null)
1201 throw new NullPointerException();
1202 if (c == this)
1203 throw new IllegalArgumentException();
1204 if (maxElements <= 0)
1205 return 0;
1206 final ReentrantLock lock = this.lock;
1207 lock.lock();
1208 try {
1209 RunnableScheduledFuture<?> first;
1210 int n = 0;
1211 while (n < maxElements && (first = peekExpired()) != null) {
1212 c.add(first); // In this order, in case add() throws.
1213 finishPoll(first);
1214 ++n;
1215 }
1216 return n;
1217 } finally {
1218 lock.unlock();
1219 }
1220 }
1221
1222 public Object[] toArray() {
1223 final ReentrantLock lock = this.lock;
1224 lock.lock();
1225 try {
1226 return Arrays.copyOf(queue, size, Object[].class);
1227 } finally {
1228 lock.unlock();
1229 }
1230 }
1231
1232 @SuppressWarnings("unchecked")
1233 public <T> T[] toArray(T[] a) {
1234 final ReentrantLock lock = this.lock;
1235 lock.lock();
1236 try {
1237 if (a.length < size)
1238 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1239 System.arraycopy(queue, 0, a, 0, size);
1240 if (a.length > size)
1241 a[size] = null;
1242 return a;
1243 } finally {
1244 lock.unlock();
1245 }
1246 }
1247
1248 public Iterator<Runnable> iterator() {
1249 return new Itr(Arrays.copyOf(queue, size));
1250 }
1251
1252 /**
1253 * Snapshot iterator that works off copy of underlying q array.
1254 */
1255 private class Itr implements Iterator<Runnable> {
1256 final RunnableScheduledFuture<?>[] array;
1257 int cursor = 0; // index of next element to return
1258 int lastRet = -1; // index of last element, or -1 if no such
1259
1260 Itr(RunnableScheduledFuture<?>[] array) {
1261 this.array = array;
1262 }
1263
1264 public boolean hasNext() {
1265 return cursor < array.length;
1266 }
1267
1268 public Runnable next() {
1269 if (cursor >= array.length)
1270 throw new NoSuchElementException();
1271 lastRet = cursor;
1272 return array[cursor++];
1273 }
1274
1275 public void remove() {
1276 if (lastRet < 0)
1277 throw new IllegalStateException();
1278 DelayedWorkQueue.this.remove(array[lastRet]);
1279 lastRet = -1;
1280 }
1281 }
1282 }
1283 }
1284