黑马程序员技术交流社区

标题: 【广州校区】【原创】Worker Thread模式浅谈 [打印本页]

作者: 瞧瞧嗨一波...    时间: 2018-11-22 14:53
标题: 【广州校区】【原创】Worker Thread模式浅谈
Worker Thread模式浅谈
这就像一个工作车间工人们负责组装模型一样,例如客户会将很多装有塑料模型的箱子带到工作车间来,然后摆放在桌子上。工人必须将送过来的塑料模型一个个组装起来。他们会先取回放在桌子上的装有塑料模型的箱子,然后阅读了箱子中的说明书后开始组装,当一 箱模型组装完成后,工人们会继续去取下一个箱子。当所有模型全部组装完成后,工人们会等待新的模型被送过来。
Worker的意思是工作的人,劳动者。在Worker Thread的模式中,工人线程(Worker Thread)会逐个取回工作并进行处理。当所有工作全部完成后,工人线程会等待新的工作到来。
Worker Thread模式也被称为Background Thread(背景线程)模式。另外,如果从“保存多个工人线程的场所”这一点来看,我们也可以称这种模式为Thread Pool(线程池)模式。
下面我们结合该模式的例子进一步理解,先介绍用到的一些类:
ClientThread类的线程会向Channel类发送工作请求(委托)(说是工作,其实只是显示出委托者的名字和委托编号)。
Channel类的实例雇佣了五个工人线程(WorkerThread)进行工作。所有工人线程都在等待工作请求的到来。
工作请求到来后,工人线程会从Channel那里获取一项工作请求并开始工作。工作完成后,工人线程会回到Channel那里等待下一项工作请求。
Request表示工作请求的类,在实例中的方法execute负责“处理”请求的方法,虽说是处理,实际上只是将运行中的线程的名字和请求内容(name,number)显示出来而已,为了体现需要处理很长时间,sleep一段时间。
public class Request {
        private final String name; //委托者名字
        private final int number;  //请求的编号
        private static final Random random = new Random();
        public Request(String name, int number) {
                this.name = name;
                this.number = number;
        }
        public void execute() {
                System.out.println(Thread.currentThread().getName()+" execute" + this);
                try {
                        Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                }
        }
        public String toString() {
                return " [Request form " + name + ", No." + number + "]";
        }
}
WorkerThread表示工人线程的类,工人线程会进行工作。“进行工作”这个处理对应以下处理:调用takeRequest方法从Channel实例中获取一个Request的实例,调用Request实例的execute方法。
工人线程一旦启动就会一直工作,会反复执行获取一个新的Request的实例,然后调用它的execute方法的处理。
public class WorkerThread extends Thread{
        private final Channel channel;
        public WorkerThread(String name,Channel channel) {
                super(name);
                this.channel = channel;
        }
        public void run(){
                while(true) {
                        Request request = channel.takeRequest();
                        request.execute();
                }
        }
}
Channel类是负责传递工作请求以及保存工人线程的类,为了传递工作请求,我们在类中定义了requestQueue字段,用来保存请求Request的队列的角色,putRequest方法用于将请求加入到队列中,takeRequest用于取出队列中的请求。这里使用了前面篇章介绍的Producer-Consumer模式,另外,为了实现putRequesttakeRequest方法,这里还使用了前面篇章介绍的Guared Suspension模式。
类中定义了一个用于保存工人线程的threadPool字段,threadPoolWorkerThread的数组,Channel的构造函数会初始化threadPool字段并创建WorkerThread的实例。数组的大小由threads决定。这里为工人线程命名worker-0,worker-1,worker-2...
startWorkers方法用于启动所有工人线程的方法。
public class Channel {
        private static final int MAX_REQUEST = 100;
        private final Request[] requestQueue;
        private int tail;    //下次putRequest的位置
        private int head;        //下次takeRequest的位置
        private int count;   //Request的数量
        private final WorkerThread[] threadPool; //工人线程池
        public Channel(int threads) {       
                this.requestQueue = new Request[MAX_REQUEST];
                this.tail = 0;
                this.head = 0;
                this.count = 0;
                this.threadPool = new WorkerThread[threads];
                for (int i = 0; i < threadPool.length; i++) {
                        threadPool = new WorkerThread("worker-"+i,this);
                }
        }
       
        public void startWorkers() {
                for (int i = 0; i < threadPool.length; i++) {
                        threadPool.start();
                }
        }
       
        public synchronized void putRequest(Request request) {
                while(count >= requestQueue.length) {
                        try {
                                wait();
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }
                requestQueue[tail]=request;
                tail = (tail+1)%requestQueue.length;
                count++;
                notifyAll();
        }
       
        public synchronized Request takeRequest() {
                while(count <= 0) {
                        try {
                                wait();
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }
                Request request = requestQueue[head];
                head = (head+1)%requestQueue.length;
                count--;
                notifyAll();
                return request;
        }
}
ClientThread类是发送请求的类,主要做以下处理:创建Request实例,并将实例传递给Channel类的putRequest方法。
为了让程序行为有些变化,这里让程序sleep一段随机长的时间
public class ClientThread extends Thread{
        private final Channel channel;
        private static final Random random = new Random();
        public ClientThread(String name,Channel channel) {
                super(name);
                this.channel = channel;
        }
       
        public void run(){
                        try {
                                for (int i = 0; true; i++) {
                                        Request request = new Request(getName(),i);
                                        channel.putRequest(request);
                                        Thread.sleep(random.nextInt(1000));
                                }
                        } catch (InterruptedException e) {
                        }
               
        }
}
Main类:启动运行类
public class Main {
        public static void main(String[] args) {
                Channel channel = new Channel(5);
                channel.startWorkers();
                new ClientThread("Alice", channel).start();
                new ClientThread("Tom", channel).start();
                new ClientThread("Jack", channel).start();
        }
}
运行结果:
从结果看出,发送请求的ClientThread与处理请求的WorkerThread之间并没有固定的对应关系,虽然来自Alice的请求No.0是由Worker-1来处理的,但是同样来自Alice的请求No.1确是Worker-2来处理的,而No.2确是Worker-3来处理的。
工人线程并不在意发送请求的是谁,它只是处理接收到的请求即可——这就是我们要介绍的Worker Thread模式。



作者: 一个人一座城0.0    时间: 2018-12-1 17:46
到此一游
作者: 一个人一座城0.0    时间: 2018-12-1 17:48
到此一游




欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2