生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和消费者可以以不同的速度生产和消费数据。生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系。
比如一个人正在准备食物(生产者),而另一个人正在吃(消费者),
他们共用一张桌子用于放置食物和取走盘食物,生产者准备食物,
如果桌子上已经满了,生产者就等待,
如果桌子空了的话消费者等待,
这里桌子就是一个共享的对象。
想象这样一个情景:
生产者: 在蛋糕店制作蛋糕
消费者: 在蛋糕店卖出蛋糕
盒子: 蛋糕店里最多存放20个新鲜蛋糕
对于这种生产者消费者模式,Java有三种常用的实现方式:
(1) Synchronized / wait() / notify()方法
(2)Lock / Condition / await() / signal()方法
(3) BlockingQueue阻塞队列方法
(4) PipedInputStream / PipedOutputStream
以下是前三种方法的演示:
Synchronized / wait() / notify()方法
wait()/ nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait():当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
notify():当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,使自己处于等待状态。
//盒子模型
class CakeShop{
int num = 0;
//锁是this
synchronized void increase() throws InterruptedException {
if (num>=20) {//控制蛋糕数不超过20个
this.wait();
}else{
num++;
System.out.println(Thread.currentThread().getName()+"制作了一个蛋糕,蛋糕数:"+num);
Thread.sleep(100);
}
this.notifyAll();
}
//锁是this
synchronized void decrease() throws InterruptedException {
if (num<=0) {
this.wait();
}else{
num--;
System.out.println(Thread.currentThread().getName()+"出售了一个蛋糕,蛋糕数:"+num);
Thread.sleep(100);
}
this.notifyAll();
}
}
//生产者线程
class Produce implements Runnable {
CakeShop cakeShop;
Produce(CakeShop cakeShop) {
this.cakeShop = cakeShop;
}
@Override
public void run() {
while (true) {
try {
cakeShop.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者线程
class Sell implements Runnable{
CakeShop cakeShop;
Sell(CakeShop cakeShop){
this.cakeShop = cakeShop;
}
@Override
public void run() {
while(true){
try {
cakeShop.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//测试类
public class A {
public static void main(String[] args) {
CakeShop cakeShop = new CakeShop();
//10个消费者线程
for(int i = 0;i<10;i++){
new Thread(new Sell(cakeShop)).start();
}
//5个生产者线程
for(int i = 0;i<5;i++){
new Thread(new Produce(cakeShop)).start();
}
}
}
运行结果:
Thread-10制作了一个蛋糕,蛋糕数:1
Thread-9出售了一个蛋糕,蛋糕数:0
Thread-14制作了一个蛋糕,蛋糕数:1
Thread-13制作了一个蛋糕,蛋糕数:2
Thread-13制作了一个蛋糕,蛋糕数:3
Thread-12制作了一个蛋糕,蛋糕数:4
Thread-11制作了一个蛋糕,蛋糕数:5
Thread-11制作了一个蛋糕,蛋糕数:6
Thread-11制作了一个蛋糕,蛋糕数:7
Thread-11制作了一个蛋糕,蛋糕数:8
Thread-12制作了一个蛋糕,蛋糕数:9
Thread-12制作了一个蛋糕,蛋糕数:10
Thread-13制作了一个蛋糕,蛋糕数:11
Thread-13制作了一个蛋糕,蛋糕数:12
Thread-14制作了一个蛋糕,蛋糕数:13
Thread-0出售了一个蛋糕,蛋糕数:12
Thread-0出售了一个蛋糕,蛋糕数:11
Thread-9出售了一个蛋糕,蛋糕数:10
Thread-9出售了一个蛋糕,蛋糕数:9
Thread-9出售了一个蛋糕,蛋糕数:8
Thread-9出售了一个蛋糕,蛋糕数:7
Thread-9出售了一个蛋糕,蛋糕数:6
Thread-9出售了一个蛋糕,蛋糕数:5
Thread-9出售了一个蛋糕,蛋糕数:4
Thread-9出售了一个蛋糕,蛋糕数:3
Thread-9出售了一个蛋糕,蛋糕数:2
Thread-2出售了一个蛋糕,蛋糕数:1
Thread-2出售了一个蛋糕,蛋糕数:0
Thread-10制作了一个蛋糕,蛋糕数:1
Thread-10制作了一个蛋糕,蛋糕数:2
Thread-10制作了一个蛋糕,蛋糕数:3
Thread-10制作了一个蛋糕,蛋糕数:4
Thread-10制作了一个蛋糕,蛋糕数:5
Thread-10制作了一个蛋糕,蛋糕数:6
Thread-10制作了一个蛋糕,蛋糕数:7
Thread-8出售了一个蛋糕,蛋糕数:6
Thread-8出售了一个蛋糕,蛋糕数:5
Thread-8出售了一个蛋糕,蛋糕数:4
Thread-8出售了一个蛋糕,蛋糕数:3
Thread-8出售了一个蛋糕,蛋糕数:2
Thread-8出售了一个蛋糕,蛋糕数:1
Thread-8出售了一个蛋糕,蛋糕数:0
Thread-14制作了一个蛋糕,蛋糕数:1
Thread-13制作了一个蛋糕,蛋糕数:2
Thread-13制作了一个蛋糕,蛋糕数:3
Thread-13制作了一个蛋糕,蛋糕数:4
Thread-13制作了一个蛋糕,蛋糕数:5
......
Lock / Condition / await() / signal()方法
在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。Condition接口的await()和signal()就是其中用来做同步的两种方法,它们的功能基本上和Object的wait()/ nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。下面来看代码:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class CakeShop{
int num = 0;
Lock lock = new ReentrantLock();
Condition fullCondition = lock.newCondition();//蛋糕过多条件
Condition emptyCondition = lock.newCondition();//没有蛋糕条件
void increase() throws InterruptedException {
//上锁
lock.lock();
//控制蛋糕数不超过20个
if (num==20) {
fullCondition.await();
}else{
num++;
System.out.println(Thread.currentThread().getName()+"制作了一个蛋糕,蛋糕数:"+num);
Thread.sleep(100);
}
//唤醒阻塞的所有生产者消费者
fullCondition.signalAll();
emptyCondition.signalAll();
//释放锁
lock.unlock();
}
void decrease() throws InterruptedException {
//上锁
lock.lock();
if (num<1) {
emptyCondition.await();
}else{
num--;
System.out.println(Thread.currentThread().getName()+"吃了一个蛋糕,蛋糕数:"+num);
Thread.sleep(100);
}
//唤醒阻塞的所有生产者消费者
fullCondition.signalAll();
emptyCondition.signalAll();
//释放锁
lock.unlock();
}
}
//生产者线程
class Produce implements Runnable {
CakeShop cakeShop;
Produce(CakeShop cakeShop) {
this.cakeShop = cakeShop;
}
@Override
public void run() {
while (true) {
try {
cakeShop.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者线程
class Sell implements Runnable{
CakeShop cakeShop;
Sell(CakeShop cakeShop){
this.cakeShop = cakeShop;
}
@Override
public void run() {
while(true){
try {
cakeShop.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//测试类
public class A {
public static void main(String[] args) {
CakeShop cakeShop = new CakeShop();
//10个消费者线程
for(int i = 0;i<10;i++){
new Thread(new Sell(cakeShop)).start();
}
//5个生产者线程
for(int i = 0;i<5;i++){
new Thread(new Produce(cakeShop)).start();
}
}
}
运行结果:
Thread-10制作了一个蛋糕,蛋糕数:1
Thread-10制作了一个蛋糕,蛋糕数:2
Thread-11制作了一个蛋糕,蛋糕数:3
Thread-11制作了一个蛋糕,蛋糕数:4
Thread-12制作了一个蛋糕,蛋糕数:5
Thread-12制作了一个蛋糕,蛋糕数:6
Thread-13制作了一个蛋糕,蛋糕数:7
Thread-13制作了一个蛋糕,蛋糕数:8
Thread-13制作了一个蛋糕,蛋糕数:9
Thread-13制作了一个蛋糕,蛋糕数:10
Thread-13制作了一个蛋糕,蛋糕数:11
Thread-13制作了一个蛋糕,蛋糕数:12
Thread-13制作了一个蛋糕,蛋糕数:13
Thread-13制作了一个蛋糕,蛋糕数:14
Thread-13制作了一个蛋糕,蛋糕数:15
Thread-14制作了一个蛋糕,蛋糕数:16
Thread-14制作了一个蛋糕,蛋糕数:17
Thread-14制作了一个蛋糕,蛋糕数:18
Thread-14制作了一个蛋糕,蛋糕数:19
Thread-14制作了一个蛋糕,蛋糕数:20
Thread-0吃了一个蛋糕,蛋糕数:19
Thread-0吃了一个蛋糕,蛋糕数:18
Thread-0吃了一个蛋糕,蛋糕数:17
Thread-0吃了一个蛋糕,蛋糕数:16
Thread-0吃了一个蛋糕,蛋糕数:15
Thread-0吃了一个蛋糕,蛋糕数:14
Thread-0吃了一个蛋糕,蛋糕数:13
Thread-0吃了一个蛋糕,蛋糕数:12
Thread-0吃了一个蛋糕,蛋糕数:11
Thread-0吃了一个蛋糕,蛋糕数:10
Thread-0吃了一个蛋糕,蛋糕数:9
Thread-0吃了一个蛋糕,蛋糕数:8
Thread-0吃了一个蛋糕,蛋糕数:7
Thread-0吃了一个蛋糕,蛋糕数:6
Thread-0吃了一个蛋糕,蛋糕数:5
Thread-0吃了一个蛋糕,蛋糕数:4
Thread-0吃了一个蛋糕,蛋糕数:3
Thread-0吃了一个蛋糕,蛋糕数:2
Thread-0吃了一个蛋糕,蛋糕数:1
Thread-0吃了一个蛋糕,蛋糕数:0
Thread-10制作了一个蛋糕,蛋糕数:1
Thread-10制作了一个蛋糕,蛋糕数:2
Thread-10制作了一个蛋糕,蛋糕数:3
Thread-11制作了一个蛋糕,蛋糕数:4
......
BlockingQueue阻塞队列方法
JDK 1.5 以后新增的 java.util.concurrent包新增了 BlockingDeque/ BlockingQueue接口。并提供了如下几种阻塞队列实现:
java.util.concurrent.ArrayBlockingDeque
java.util.concurrent.LinkedBlockingDeque
java.util.concurrent.SynchronousDeque
java.util.concurrent.PriorityBlockingDeque
实现生产者-消费者模型使用 ArrayBlockingDeque或者 LinkedBlockingDeque即可。
我们这里使用LinkedBlockingDeque,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()/ signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
我们可以跟进源码看一下LinkedBlockingDeque类的put()方法和take()实现:
先看一下类中的lock和condition:
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
put()方法实现:
public void put(E e) throws InterruptedException {
putLast(e);
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
可见阻塞队列底层也是使用lock,condition来实现的。
接下来回到正题,看我们的蛋糕店怎么使用阻塞队列:
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
class CakeShop{
//存放蛋糕的队列,设置最大数量为20
BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>(20);
CakeShop(){//初始化四个蛋糕
blockingDeque.push(1);
blockingDeque.push(1);
blockingDeque.push(1);
blockingDeque.push(1);
}
void increase() throws InterruptedException {
blockingDeque.put(0);
System.out.println(Thread.currentThread().getName()+"生产了一个蛋糕,蛋糕数:"+blockingDeque.size());
}
void decrease() throws InterruptedException {
blockingDeque.take();
System.out.println(Thread.currentThread().getName()+"吃了一个蛋糕,蛋糕数:"+blockingDeque.size());
}
}
//生产者线程n
class Produce implements Runnable {
CakeShop cakeShop;
Produce(CakeShop cakeShop) {
this.cakeShop = cakeShop;
}
@Override
public void run() {
while (true) {
try {
cakeShop.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者线程
class Sell implements Runnable{
CakeShop cakeShop;
Sell(CakeShop cakeShop){
this.cakeShop = cakeShop;
}
@Override
public void run() {
while(true){
try {
cakeShop.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//测试类
public class A {
public static void main(String[] args) {
CakeShop cakeShop = new CakeShop();
//10个消费者线程
for(int i = 0;i<10;i++){
new Thread(new Sell(cakeShop)).start();
}
//5个生产者线程
for(int i = 0;i<5;i++){
new Thread(new Produce(cakeShop)).start();
}
}
}
输出结果:由于输出时的线程可能会被抢占,所以输出的数据不一定符合预期
Thread-0吃了一个蛋糕,蛋糕数:3
Thread-0吃了一个蛋糕,蛋糕数:2
Thread-0吃了一个蛋糕,蛋糕数:1
Thread-0吃了一个蛋糕,蛋糕数:0
Thread-10生产了一个蛋糕,蛋糕数:1
Thread-10生产了一个蛋糕,蛋糕数:1
Thread-10生产了一个蛋糕,蛋糕数:2
Thread-10生产了一个蛋糕,蛋糕数:3
Thread-10生产了一个蛋糕,蛋糕数:3
Thread-10生产了一个蛋糕,蛋糕数:4
Thread-10生产了一个蛋糕,蛋糕数:5
Thread-10生产了一个蛋糕,蛋糕数:6
Thread-10生产了一个蛋糕,蛋糕数:5
Thread-10生产了一个蛋糕,蛋糕数:6
Thread-10生产了一个蛋糕,蛋糕数:7
Thread-10生产了一个蛋糕,蛋糕数:8
Thread-0吃了一个蛋糕,蛋糕数:0
Thread-0吃了一个蛋糕,蛋糕数:7
Thread-10生产了一个蛋糕,蛋糕数:8
Thread-7吃了一个蛋糕,蛋糕数:4
Thread-7吃了一个蛋糕,蛋糕数:3
Thread-7吃了一个蛋糕,蛋糕数:2
Thread-7吃了一个蛋糕,蛋糕数:1
Thread-7吃了一个蛋糕,蛋糕数:0
Thread-4吃了一个蛋糕,蛋糕数:7
Thread-10生产了一个蛋糕,蛋糕数:1
Thread-10生产了一个蛋糕,蛋糕数:2
Thread-10生产了一个蛋糕,蛋糕数:3
Thread-10生产了一个蛋糕,蛋糕数:4
Thread-10生产了一个蛋糕,蛋糕数:4
Thread-10生产了一个蛋糕,蛋糕数:5
Thread-10生产了一个蛋糕,蛋糕数:6
Thread-10生产了一个蛋糕,蛋糕数:7
Thread-10生产了一个蛋糕,蛋糕数:8
Thread-10生产了一个蛋糕,蛋糕数:9
Thread-3吃了一个蛋糕,蛋糕数:4
Thread-3吃了一个蛋糕,蛋糕数:6
Thread-3吃了一个蛋糕,蛋糕数:5
Thread-2吃了一个蛋糕,蛋糕数:5
......
|
|