黑马程序员技术交流社区

标题: 【太原校区】Java自定义线程池和线程总数控制 [打印本页]

作者: 达摩侠    时间: 2018-12-20 12:50
标题: 【太原校区】Java自定义线程池和线程总数控制
本帖最后由 达摩侠 于 2018-12-20 12:52 编辑

1 概述

池化是常见的思想,线程池是非常典型的池化的实现,《Java并发编程实战》也大篇幅去讲解了Java中的线程池。本文实现一个简单的线程池。


2 核心类

【1】接口定义

[Java] 纯文本查看 复制代码

public interface IThreadPool<Job extends Runnable> {
        /**
         * 关闭线程池
         */
        public void shutAlldown();

        /**
         * 执行任务
         *
         * @param job 任务
         */
        public void execute(Job job);

        /**
         * 添加工作者
         *
         * @param addNum 添加数
         */
        public void addWorkers(int addNum);

        /**
         * 减少工作者
         *
         * @param reduceNum 减少数目
         */
        public void reduceWorkers(int reduceNum);
}

【2】实现类


线程池的核心是维护了1个任务列表和1个工作者列表。

[AppleScript] 纯文本查看 复制代码

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

public class XYThreadPool<Job extends Runnable> implements IThreadPool<Job> {

        // 默认线程数
        private static int DEAFAULT_SIZE = 5;
        // 最大线程数
        private static int MAX_SIZE = 10;

        // 任务列表
        private LinkedList<Job> tasks = new LinkedList<Job>();
        // 工作线程列表
        private List<Worker> workers = Collections
                        .synchronizedList(new ArrayList<Worker>());

        /**
         * 默认构造函数
         */
        public XYThreadPool() {
                initWokers(DEAFAULT_SIZE);
        }

        /**
         * 执行线程数
         *
         * @param threadNums 线程数
         */
        public XYThreadPool(int workerNum) {
                workerNum = workerNum <= 0 ? DEAFAULT_SIZE
                                : workerNum > MAX_SIZE ? MAX_SIZE : workerNum;
                initWokers(workerNum);
        }

        /**
         * 初始化线程池
         *
         * @param threadNums 线程数
         */
        public void initWokers(int threadNums) {
                for (int i = 0; i < threadNums; i++) {
                        Worker worker = new Worker();
                        worker.start();
                        workers.add(worker);
                }
                // 添加关闭钩子
                Runtime.getRuntime().addShutdownHook(new Thread() {
                        public void run() {
                                shutAlldown();
                        }
                });
        }

        @Override
        public void shutAlldown() {
                for (Worker worker : workers) {
                        worker.shutdown();
                }
        }

        @Override
        public void execute(Job job) {
                synchronized (tasks) {
                        // 提交任务就是将任务对象加入任务队列,等待工作线程去处理
                        tasks.addLast(job);
                        tasks.notifyAll();
                }
        }

        @Override
        public void addWorkers(int addNum) {
                // 新线程数必须大于零,并且线程总数不能大于最大线程数
                if ((workers.size() + addNum) <= MAX_SIZE && addNum > 0) {
                        initWokers(addNum);
                } else {
                        System.out.println("addNum too large");
                }
        }

        @Override
        public void reduceWorkers(int reduceNum) {
                if ((workers.size() - reduceNum <= 0))
                        System.out.println("thread num too small");
                else {
                        // 暂停指定数量的工作者
                        int count = 0;
                        while (count != reduceNum) {
                                for (Worker w : workers) {
                                        w.shutdown();
                                        count++;
                                }
                        }
                }
        }

        /**
         * 工作线程
         */
        class Worker extends Thread {

                private volatile boolean flag = true;

                @Override
                public void run() {
                        while (flag) {
                                Job job = null;
                                // 加锁(若只有一个woker可不必加锁,那就是所谓的单线程的线程池,线程安全)
                                synchronized (tasks) {
                                        // 任务队列为空
                                        while (tasks.isEmpty()) {
                                                try {
                                                        // 阻塞,放弃对象锁,等待被notify唤醒
                                                        tasks.wait();
                                                        System.out.println("block when tasks is empty");
                                                } catch (InterruptedException e) {
                                                        e.printStackTrace();
                                                }
                                        }
                                        // 不为空取出任务
                                        job = tasks.removeFirst();
                                        System.out.println("get job:" + job + ",do biz");
                                        job.run();
                                }
                        }
                }

                public void shutdown() {
                        flag = false;
                }
        }
}

1 当调用wait()方法时线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用notify()方法后本线程才进入对象锁定池准备



2 Object的方法:void notify(): 唤醒一个正在等待该对象的线程。void notifyAll(): 唤醒所有正在等待该对象的线程。notifyAll使所有原来在该对象上等待被notify的线程统统退出wait状态,变成等待该对象上的锁,一旦该对象被解锁,它们会去竞争。notify只是选择一个wait状态线程进行通知,并使它获得该对象上的锁,但不惊动其它同样在等待被该对象notify的线程们,当第一个线程运行完毕以后释放对象上的锁,此时如果该对象没有再次使用notify语句,即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,继续处在wait状态,直到这个对象发出一个notify或notifyAll,它们等待的是被notify或notifyAll,而不是锁。




3 无需控制线程总数

每调用一次就会创建一个拥有10个线程工作者的线程池。

[Java] 纯文本查看 复制代码
public class TestService1 {
        public static void main(String[] args) {
                // 启动10个线程
                XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
                pool.execute(new Runnable() {
                        @Override
                        public void run() {
                                System.out.println("====1 test====");
                        }
                });        
        }
}

public class TestService2 {
        public static void main(String[] args) {
                // 启动10个线程
                XYThreadPool<Runnable> pool = new XYThreadPool<Runnable>(10);
                pool.execute(new Runnable() {
                        @Override
                        public void run() {
                                System.out.println("====2 test====");
                        }
                });
        }
}

4 控制线程总数
希望在项目中所有的线程调用,都共用1个固定工作者数大小的线程池。

[AppleScript] 纯文本查看 复制代码
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xy.pool.XYThreadPool;

/**
* 统一线程池管理类
*/
@Component
public class XYThreadManager {

        private XYThreadPool<Runnable> executorPool;

        @PostConstruct
        public void init() {
                executorPool = new XYThreadPool<Runnable>(10);
        }

        public XYThreadPool<Runnable> getExecutorPool() {
                return executorPool;
        }
}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("testService3")
public class TestService3 {
        
        @Autowired
        private XYThreadManager threadManager;
        
        public void test() {
                threadManager.getExecutorPool().execute(new Runnable() {
                        @Override
                        public void run() {
                                System.out.println("====3 test====");
                        }
                });
        }
}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("testService4")
public class TestService4 {
        
        @Autowired
        private XYThreadManager threadManager;
        
        public void test() {
                threadManager.getExecutorPool().execute(new Runnable() {
                        @Override
                        public void run() {
                                System.out.println("====4 test====");
                        }
                });
        }
}

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TestMain {

        @SuppressWarnings("resource")
        public static void main(String[] args) {
                ApplicationContext atc = new ClassPathXmlApplicationContext("applicationContext.xml");

                TestService3 t3 = (TestService3) atc.getBean("testService3");
                t3.test();

                TestService4 t4 = (TestService4) atc.getBean("testService4");
                t4.test();
        }

}



作者: 达摩侠    时间: 2018-12-20 12:52
自己顶一波
作者: renhua    时间: 2018-12-20 13:26

作者: Vicky韦    时间: 2018-12-20 13:33
厉害/:strong
作者: 高建华    时间: 2018-12-20 13:35

作者: liudongjie    时间: 2018-12-20 13:36

作者: 王高飞    时间: 2018-12-20 13:39
学到了
作者: 张志辉    时间: 2018-12-20 13:43
很好很强大
作者: cuichang1    时间: 2018-12-20 13:44

作者: 郝永亮    时间: 2018-12-20 14:21

作者: HowieChen    时间: 2018-12-20 15:01
            
作者: liuchengwei1    时间: 2018-12-20 18:50





欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2