/**
* 三步:1,创建线程直到corePoolSize;2,加入任务队列;3,如果还是执行不过来,则执行拒绝策略
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2,workQueue,当任务被提交但尚未被执行的任务队列,是一个BlockingQueue接口的对象,只存放Runnable对象。根据队列功能分类,看下JDK提供的几种BlockingQueue:
BlockingQueue分类说明
work queue class name 说明
SynchronousQueue 直接提交队列:没有容量,每一个插入操作都要等待一个相应的删除操作。通常使用需要将maximumPoolSize的值设置很大,否则很容易触发拒绝策略。
ArrayBlockingQueue 有界的任务队列:任务大小通过入参 int capacity决定,当填满队列后才会创建大于corePoolSize的线程。
LinkedBlockingQueue 无界的任务队列:线程个数最大为corePoolSize,如果任务过多,则不断扩充队列,知道内存资源耗尽。
PriorityBlockingQueue 优先任务队列:是一个无界的特殊队列,可以控制任务执行的先后顺序,而上边几个都是先进先出的策略。
3,ThreadFactory,是用来创建线程池中的线程工厂类,我们来看下JDK中的默认DefaultThreadFactory类,并自己写个试试:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 自定义试试
*/
public class ThreadFactoryDemo {
public static class MyTask implements Runnable{
public void run() {
System.out.println(System.currentTimeMillis() + "Thread Name:" + Thread.currentThread().getName());
public static void main(String[] args) {
FixThreadPoolDemo.MyTask myTask = new FixThreadPoolDemo.MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t =new Thread(r);
t.setDaemon(true);
System.out.println("create thread" + t.getName());
return t;
}
});
for (int i = 0; i < 10 ; i++) {
es.submit(myTask);
}
@Override
protected void terminated() {
System.out.println("线程池退出" );
}
};
for (int i = 0; i < 10 ; i++) {
es.submit(myTask);
}
Thread.sleep(3000);
es.shutdown();
}
}
五,线程数量的优化:线程池的大小对系统性能有一定的影响,过大或者过小都无法发挥系统的最佳性能。但是也没有必要做的特别精确,只是不要太大,不要太小即可。我们可以根据此公式进行粗略计算:线程池个数=CPU的数量*CPU的使用率*(1+等待时间/计算时间)。当然了还需要根据实际情况,积累实际经验,来进行判断。
六,线程池中的堆栈信息,我们通过下边两个例子进行查看:
/**
* 会吃掉一些异常信息,非常不友好
*/
public class ExceptionThreadPoolDemo {
public static class MyTask implements Runnable {
int a, b;
public MyTask(int a, int b) {
this.a = a;
this.b = b;
}
public void run() {
double re = a / b;
System.out.println(re);
}
}
public static void main(String[] args) {
//ExecutorService es = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
ExecutorService es = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; i++) {
//不进行日志打印
//es.submit(new MyTask(100,i));
//进行日志打印,只是打印了具体方法错误:Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
// at com.ljh.thread.thread_pool.ExceptionThreadPoolDemo$MyTask.run(ExceptionThreadPoolDemo.java:24)
// at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
// at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
// at java.lang.Thread.run(Thread.java:748)
es.execute(new MyTask(100,i));
}
}
}
/**
* 通过自定自己的线程池,重写execute和submit方法,将异常打印出来:
*/
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
super.execute(wrap(command,ljhTrace(),Thread.currentThread().getName()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task,ljhTrace(),Thread.currentThread().getName()));
}
private Exception ljhTrace(){
return new Exception("ljh-stack-trace");
}
private Runnable wrap(final Runnable task , final Exception ljhException,String threadName){
return new Runnable() {
public void run() {
try {
task.run();
}catch (Exception e){
ljhException.printStackTrace();
e.printStackTrace();
}