@Override
public void run() {
// 该线程一直启动着,不断从任务队列取出任务执行
while (!Thread.currentThread().isInterrupted()) {
// 如果初始化任务不为空,则执行初始化任务
if (task != null) {
task.run();
task = null;
} else {
Runnable queueTask = taskQueue.poll();
if (queueTask != null)
queueTask.run();
}
}
}
}
public static void main(String[] args) {
TestThreadPoolExecutor threadPoolExecutor = new TestThreadPoolExecutor(5);
for (int i = 0; i < 20; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName() + " is running");
});
}
}
}
执行输出:
Thread-0 is running
Thread-2 is running
Thread-3 is running
Thread-1 is running
Thread-1 is running
Thread-1 is running
Thread-4 is running
Thread-0 is running
Thread-4 is running
Thread-4 is running
Thread-2 is running
Thread-1 is running
Task queue is full!
Thread-4 is running
Thread-0 is running
Thread-4 is running
Thread-1 is running
Thread-2 is running
Thread-3 is running
Thread-0 is running
4. 几种常见的线程池
上面大概分析了下线程池的运行机制和自己实现了一个简单的线程池。
接下来看看几种常见的线程池,Executors是JDK提供的线程池工厂类:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
Core threads: 3
Largest executions: 0
Maximum allowed threads: 3
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 3
Largest executions: 2
Maximum allowed threads: 3
Current threads in pool: 2
Currently executing threads: 2
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-2
Running Task! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-2
5.1 构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造器可以看出:
5.2 提交任务
public Future<?> submit(Runnable task) {
// task不能为空,否则抛出异常
if (task == null) throw new NullPointerException();
// 构造一个RunnableFuture
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 执行任务
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
提交任务流程分3步:
1. 如果运行的线程数少于corePoolSize,则尝试启动一个新线程去执行该任务。对addworker的调用会自动检查线程池的runState和workerCount,如果添加失败会返回false,添加成功则直接返回。
2. 如果运行的线程数大于corePoolSize,则尝试插入队列,这时仍然需要再次检查线程池的runState,因为可能进入此分支后线程池关闭了。如果线程池已关闭,会移除该任务,并交给饱和策略处理,如果没有,则启动一个新线程。
3. 如果前2步都不是,则尝试添加一个新线程。如果添加失败(可能是线程池已关闭或者饱和),则交给饱和策略处理。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
获取当前线程池的状态,如果是STOP, TIDYING, TERMINATED状态的话,则会返回false,如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false。
通过自旋的方式,判断要添加的Worker是否是corePool,如果是的话,那么则判断当前的workerCount是否大于corePoolsize,否则则判断是否大于maximumPoolSize,如果满足的话,说明workerCount超出了线程池大小,直接返回false。如果小于的话,那么判断是否成功将WorkerCount通过CAS操作增加1,如果增加成功的话。则进行到第3步,否则则判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断。
如果满足了的话,那么则创建一个新的Worker对象,然后获取线程池的重入锁后,判断当前线程池的状态,如果当前线程池状态为STOP, TIDYING, TERMINATED的话,那么调用decrementWorkerCount将workerCount减一,然后调用tryTerminate停止线程池,并且返回false。
如果状态满足的话,那么则在workers中将新创建的worker添加,并且重新计算largestPoolSize,然后启动Worker中的线程开始执行任务。
重新Check一次当前线程池的状态,如果处于STOP状态的话,那么就调用interrupt方法中断线程执行。
任务添加成功会加入到workers中,workers就是用来负责管理所有线程任务的:
private final HashSet<Worker> workers = new HashSet<Worker>();
然后调用start()启动任务:
if (workerAdded) {
t.start();
workerStarted = true;
}
线程池使用的是自旋锁来判断线程池的运行状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
5.3 Worker类
上面提交任务最终提交的是Worker,那么Worker是怎么定义的呢?
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.