类层次结构
![]()
构造方法public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);}public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);}public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);}复制代码执行和调度任务方法public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit) public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,TimeUnit unit)public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,long period,TimeUnit unit) public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit) public Future<?> submit(Runnable task)public <T> Future<T> submit(Runnable task, T result)public <T> Future<T> submit(Callable<T> task)复制代码其他方法public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()public void setRemoveOnCancelPolicy(boolean value)public boolean getRemoveOnCancelPolicy()public void shutdown()public List<Runnable> shutdownNow()public BlockingQueue<Runnable> getQueue()复制代码实现原理ScheduledThreadPoolExecutor调度和执行任务的过程可以抽象如下图所示:
![]()
- 将Callable或Runnable对象转换ScheduledFutureTask对象;
- 将转换后的ScheduledFutureTask对象添加到延迟队列并开启线程执行任务;
- 工作线程从队列获取ScheduledFutureTask任务执行任务。
通过上述描述可以发现,ScheduledFutureTask是ScheduledThreadPoolExecutor实现的关键。
源码分析public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 步骤1 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 步骤2 delayedExecute(t); return t;}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;}// 将任务添加到延迟队列private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); }}复制代码ScheduledFutureTask类层次结构
![]()
构造方法 ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }}复制代码任务执行public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) // 非周期性任务,直接执行 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { // 执行任务,重置状态 // 计算下一次执行时间 setNextRunTime(); // 重新添加到延迟队列 reExecutePeriodic(outerTask); }}void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); }}
【转载】仅作分享,侵删
作者:牛觅
链接:https://juejin.im/post/5bfcec63f265da61682b1028
|
|