Worker Thread模式浅谈 这就像一个工作车间工人们负责组装模型一样,例如客户会将很多装有塑料模型的箱子带到工作车间来,然后摆放在桌子上。工人必须将送过来的塑料模型一个个组装起来。他们会先取回放在桌子上的装有塑料模型的箱子,然后阅读了箱子中的说明书后开始组装,当一 箱模型组装完成后,工人们会继续去取下一个箱子。当所有模型全部组装完成后,工人们会等待新的模型被送过来。 Worker的意思是工作的人,劳动者。在Worker Thread的模式中,工人线程(Worker Thread)会逐个取回工作并进行处理。当所有工作全部完成后,工人线程会等待新的工作到来。 Worker Thread模式也被称为Background Thread(背景线程)模式。另外,如果从“保存多个工人线程的场所”这一点来看,我们也可以称这种模式为Thread Pool(线程池)模式。 下面我们结合该模式的例子进一步理解,先介绍用到的一些类: ClientThread类的线程会向Channel类发送工作请求(委托)(说是工作,其实只是显示出委托者的名字和委托编号)。 Channel类的实例雇佣了五个工人线程(WorkerThread)进行工作。所有工人线程都在等待工作请求的到来。 工作请求到来后,工人线程会从Channel那里获取一项工作请求并开始工作。工作完成后,工人线程会回到Channel那里等待下一项工作请求。 Request表示工作请求的类,在实例中的方法execute负责“处理”请求的方法,虽说是处理,实际上只是将运行中的线程的名字和请求内容(name,number)显示出来而已,为了体现需要处理很长时间,sleep一段时间。 public class Request { private final String name; //委托者名字 private final int number; //请求的编号 private static final Random random = new Random(); public Request(String name, int number) { this.name = name; this.number = number; } public void execute() { System.out.println(Thread.currentThread().getName()+" execute" + this); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } public String toString() { return " [Request form " + name + ", No." + number + "]"; } } WorkerThread表示工人线程的类,工人线程会进行工作。“进行工作”这个处理对应以下处理:调用takeRequest方法从Channel实例中获取一个Request的实例,调用Request实例的execute方法。 工人线程一旦启动就会一直工作,会反复执行获取一个新的Request的实例,然后调用它的execute方法的处理。 public class WorkerThread extends Thread{ private final Channel channel; public WorkerThread(String name,Channel channel) { super(name); this.channel = channel; } public void run(){ while(true) { Request request = channel.takeRequest(); request.execute(); } } } Channel类是负责传递工作请求以及保存工人线程的类,为了传递工作请求,我们在类中定义了requestQueue字段,用来保存请求Request的队列的角色,putRequest方法用于将请求加入到队列中,takeRequest用于取出队列中的请求。这里使用了前面篇章介绍的Producer-Consumer模式,另外,为了实现putRequest和takeRequest方法,这里还使用了前面篇章介绍的Guared Suspension模式。 类中定义了一个用于保存工人线程的threadPool字段,threadPool是WorkerThread的数组,Channel的构造函数会初始化threadPool字段并创建WorkerThread的实例。数组的大小由threads决定。这里为工人线程命名worker-0,worker-1,worker-2... startWorkers方法用于启动所有工人线程的方法。 public class Channel { private static final int MAX_REQUEST = 100; private final Request[] requestQueue; private int tail; //下次putRequest的位置 private int head; //下次takeRequest的位置 private int count; //Request的数量 private final WorkerThread[] threadPool; //工人线程池 public Channel(int threads) { this.requestQueue = new Request[MAX_REQUEST]; this.tail = 0; this.head = 0; this.count = 0; this.threadPool = new WorkerThread[threads]; for (int i = 0; i < threadPool.length; i++) { threadPool = new WorkerThread("worker-"+i,this); } } public void startWorkers() { for (int i = 0; i < threadPool.length; i++) { threadPool.start(); } } public synchronized void putRequest(Request request) { while(count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } requestQueue[tail]=request; tail = (tail+1)%requestQueue.length; count++; notifyAll(); } public synchronized Request takeRequest() { while(count <= 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Request request = requestQueue[head]; head = (head+1)%requestQueue.length; count--; notifyAll(); return request; } } ClientThread类是发送请求的类,主要做以下处理:创建Request实例,并将实例传递给Channel类的putRequest方法。 为了让程序行为有些变化,这里让程序sleep一段随机长的时间 public class ClientThread extends Thread{ private final Channel channel; private static final Random random = new Random(); public ClientThread(String name,Channel channel) { super(name); this.channel = channel; } public void run(){ try { for (int i = 0; true; i++) { Request request = new Request(getName(),i); channel.putRequest(request); Thread.sleep(random.nextInt(1000)); } } catch (InterruptedException e) { } } } Main类:启动运行类 public class Main { public static void main(String[] args) { Channel channel = new Channel(5); channel.startWorkers(); new ClientThread("Alice", channel).start(); new ClientThread("Tom", channel).start(); new ClientThread("Jack", channel).start(); } } 运行结果: 从结果看出,发送请求的ClientThread与处理请求的WorkerThread之间并没有固定的对应关系,虽然来自Alice的请求No.0是由Worker-1来处理的,但是同样来自Alice的请求No.1确是Worker-2来处理的,而No.2确是Worker-3来处理的。 工人线程并不在意发送请求的是谁,它只是处理接收到的请求即可——这就是我们要介绍的Worker Thread模式。
|