A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 小江哥 黑马粉丝团   /  2019-1-19 14:54  /  1128 人查看  /  0 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。
其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。
问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。
同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。
其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。
这样看来,Condition和传统的线程通信没什么区别,Condition的强大之处在于它可以为多个线程间建立不同的Condition,下面引入API中的一段代码,加以说明。
代码:
[Java] 纯文本查看 复制代码
class BoundedBuffer {
   final Lock lock = new ReentrantLock();//锁对象
   final Condition notFull  = lock.newCondition();//写线程条件 
   final Condition notEmpty = lock.newCondition();//读线程条件 

   final Object[] items = new Object[100];//缓存队列
   int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)//如果队列满了 
         notFull.await();//阻塞写线程
       items[putptr] = x;//赋值 
       if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
       ++count;//个数++
       notEmpty.signal();//唤醒读线程
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)//如果队列为空
         notEmpty.await();//阻塞读线程
       Object x = items[takeptr];//取值 
       if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
       --count;//个数--
       notFull.signal();//唤醒写线程
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }
这是一个处于多线程工作环境下的缓存区,缓存区提供了两个方法,put和take,put是存数据,take是取数据,内部有个缓存队列,具体变量和方法说明见代码,
这个缓存区类实现的功能:有多个线程往里面存数据和从里面取数据,其缓存队列(先进先出后进后出)能缓存的最大数值是100,多个线程间是互斥的,
当缓存队列中存储的值达到100时,将写线程阻塞,并唤醒读线程,当缓存队列中存储的值为0时,将读线程阻塞,并唤醒写线程,这也是ArrayBlockingQueue的内部实现。
下面分析一下代码的执行过程:
        1. 一个写线程执行,调用put方法;
        2. 判断count是否为100,显然没有100;
        3. 继续执行,存入值;
        4. 判断当前写入的索引位置++后,是否和100相等,相等将写入索引值变为0,并将count+1;
        5. 仅唤醒读线程阻塞队列中的一个;
        6. 一个读线程执行,调用take方法;
        7. ……
        8. 仅唤醒写线程阻塞队列中的一个。
        这就是多个Condition的强大之处,假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间。
Condition的应用
[Java] 纯文本查看 复制代码
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    static class NumberWrapper {
        public int value = 1;
    }

    public static void main(String[] args)  {
        //初始化可重入锁
        final Lock lock = new ReentrantLock();

        //第一个条件当屏幕上输出到3
        final Condition reachThreeCondition = lock.newCondition();
        //第二个条件当屏幕上输出到6
        final Condition reachSixCondition = lock.newCondition();

        //NumberWrapper只是为了封装一个数字,一边可以将数字对象共享,并可以设置为final
        //注意这里不要用Integer, Integer 是不可变对象
        final NumberWrapper num = new NumberWrapper();
        //初始化A线程
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                //需要先获得锁
                lock.lock();
                try {
                    System.out.println("threadA start write");
                    //A线程先输出前3个数
                    while (num.value <= 3) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //输出到3时要signal,告诉B线程可以开始了
                    reachThreeCondition.signal();
                } finally {
                    lock.unlock();
                }
                lock.lock();
                try {
                    //等待输出6的条件
                    reachSixCondition.await();
                    System.out.println("threadA start write");
                    //输出剩余数字
                    while (num.value <= 9) {
                        System.out.println(num.value);
                        num.value++;
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }

        });


        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();

                    while (num.value <= 3) {
                        //等待3输出完毕的信号
                        reachThreeCondition.await();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
                try {
                    lock.lock();
                    //已经收到信号,开始输出4,5,6
                    System.out.println("threadB start write");
                    while (num.value <= 6) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //4,5,6输出完毕,告诉A线程6输出完了
                    reachSixCondition.signal();
                } finally {
                    lock.unlock();
                }
            }

        });

        //启动两个线程
        threadB.start();
        threadA.start();
    }
}
[Java] 纯文本查看 复制代码
threadA start write
2
threadB start write
5
threadA start write
8
基本思路就是首先要A线程先写1,2,3,这时候B线程应该等待reachThredCondition信号,而当A线程写完3之后就通过signal告诉B线程“我写到3了,该你了”,
这时候A线程要等reachSixCondition信号,同时B线程得到通知,开始写4,5,6,写完4,5,6之后B线程通知A线程reachSixCondition条件成立了,这时候A线程就开始写剩下的7,8,9了。
Java官方提供的例子:
[Java] 纯文本查看 复制代码
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    public static void main(String[] args)  {
        final BoundedBuffer boundedBuffer = new BoundedBuffer();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("t1 run");
                for (int i=0;i<20;i++) {
                    try {
                        System.out.println("putting..");
                        boundedBuffer.put(Integer.valueOf(i));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }) ;

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i=0;i<20;i++) {
                    try {
                        Object val = boundedBuffer.take();
                        System.out.println(val);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }) ;

        t1.start();
        t2.start();
    }

    /**
     * BoundedBuffer 是一个定长100的集合,当集合中没有元素时,take方法需要等待,直到有元素时才返回元素
     * 当其中的元素数达到最大值时,要等待直到元素被take之后才执行put的操作
     * @author yukaizhao
     *
     */
    static class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

        final Object[] items = new Object[100];
        int putptr, takeptr, count;

        public void put(Object x) throws InterruptedException {
            System .out.println("put wait lock");
            lock.lock();
            System.out.println("put get lock");
            try {
                while (count == items.length) {
                    System.out.println("buffer full, please wait");
                    notFull.await();
                }

                items[putptr] = x;
                if (++putptr == items.length)
                    putptr = 0;
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
        
        public Object take() throws InterruptedException {
            System.out.println("take wait lock");
            lock.lock();
            System.out.println("take get lock");
            try {
                while (count == 0) {
                    System.out.println("no elements, please wait");
                    notEmpty.await();
                }
                Object x = items[takeptr];
                if (++takeptr == items.length)
                    takeptr = 0;
                --count;
                notFull.signal();
                return x;
            } finally {
                lock.unlock();
            }
        }
    }
}
[Java] 纯文本查看 复制代码
t1 run
putting..
take wait lock
take get lock
no elements, please wait
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
take wait lock
take get lock
也可以修改t1,t2线程里面的循环,改成大于100,可能会碰到集合已满的情况。
这个示例中BoundedBuffer是一个固定长度的集合,
这个在其put操作时,如果发现长度已经达到最大长度,那么要等待notFull信号才能继续put,如果得到notFull信号会像集合中添加元素,并且put操作会发出notEmpty的信号,
而在其take方法中如果发现集合长度为空,那么会等待notEmpty的信号,接受到notEmpty信号才能继续take,同时如果拿到一个元素,那么会发出notFull的信号。
Condition与Object中的wati,notify,notifyAll区别:
1.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。
不同的是,Object中的这些方法是和同步锁捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。
2.Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。
如果采用Object类中的wait(),notify(),notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马