java高级特性-更新版本 1. java多线程增强1.1. java多线程基本知识1.1.1. 进程介绍 不管是我们开发的应用程序,还是我们运行的其他的应用程序,都需要先把程序安装在本地的硬盘上。然后找到这个程序的启动文件,启动程序的时候,其实是电脑把当前的这个程序加载到内存中,在内存中需要给当前的程序分配一段独立的运行空间。这片空间就专门负责当前这个程序的运行。 不同的应用程序运行的过程中都需要在内存中分配自己独立的运行空间,彼此之间不会相互的影响。我们把每个独立应用程序在内存的独立空间称为当前应用程序运行的一个进程。 进程:它是内存中的一段独立的空间,可以负责当前应用程序的运行。当前这个进程负责调度当前程序中的所有运行细节。 1.1.2. 线程介绍 启动的QQ聊天软件,需要和多个人进行聊天。这时多个人之间是不能相互影响,但是它们都位于当前QQ这个软件运行时所分配的内容的独立空间中。 在一个进程中,每个独立的功能都需要独立的去运行,这时又需要把当前这个进程划分成多个运行区域,每个独立的小区域(小单元)称为一个线程。 线程:它是位于进程中,负责当前进程中的某个具备独立运行资格的空间。 进程是负责整个程序的运行,而线程是程序中具体的某个独立功能的运行。一个进程中至少应该有一个线程。 1.1.3. 多线程介绍 现在的操作系统基本都是多用户,多任务的操作系统。每个任务就是一个进程。而在这个进程中就会有线程。 真正可以完成程序运行和功能的实现靠的是进程中的线程。 多线程:在一个进程中,我们同时开启多个线程,让多个线程同时去完成某些任务(功能)。 多线程的目的:提高程序的运行效率。 1.1.4. 多线程运行的原理 在电脑中负责程序运行的控制器CPU。 其实真正电脑中的程序的运行不是同时在运行的。CPU负责程序的运行,而CPU在运行程序的过程中某个时刻点上,它其实只能运行一个程序。而不是多个程序。而CPU它可以在多个程序之间进行高速的切换。而切换频率和速度太快,导致人的肉看看不到。 每个程序就是进程, 而每个进程中会有多个线程,而CPU是在这些线程之间进行切换。 了解了CPU对一个任务的执行过程,我们就必须知道,多线程可以提高程序的运行效率,但不能无限制的开线程。 1.1.5. Java关于线程的描述程序运行靠的线程,在每个程序中都会一个线程的存在,线程是程序运行过程中存在一类事物,Java就必须对这个事物有类的描述和封装。 在Java中使用Thread类描述线程这个事物。 1.1.6. 实现线程的两种方式1、继承Thread的原理 为什么要继承Thread类? 线程是程序运行过程中的最基本的单元。而Java对线程使用的Thread这个类进行描述。而我们现在希望通过自己的代码操作线程,自己的代码应该需要和Thread类之间产生关系。这里我们采用的继承的关系。 当我们继承了Thread类之后,我们自己的类也就变成了线程类。我们自己的类就继承到了Thread类中的所有功能,并且自己的类就可以对线程进行各种操作(开启线程,停止线程等)。 为什么要复写run方法 为什么要使用线程:因为我们希望程序中的某段代码可以同时运行,提高程序的运行效率。 我们定义的类继承了Thread的之后,其实在Thread类中有个run方法,它是开启线程之后,就会直接去运行的方法。而Java在设计线程类(Thread)的时候,就已经明确了线程应该执行的某段代码需要书写在run方法中,之后在run方法中的代码开启线程之后才能正常的运行。 我们使用线程的目的是让线程执行后来自己程序中的某些代码, 而Java中规定需要线程执行的代码必须写run方法中,Thread类中的run方法中并没有我们真正需要多线程运行的代码,而开启线程又要去运行run方法,这时我们只能沿用Thread类run方法的定义格式,然后复写run方法的方法体代码。 复写run方法的目的是明确线程要执行的代码,只有把代码写在run方法中,线程开启后才会去执行。 需要线程执行的代码:这段代码称为线程要执行的任务。线程要执行的任务,需要书写在run方法中。 为什么不直接调用run方法,而调用start方法 当书写了一个类继承了Thread类之后,这个子类也变成线程类。这时可以创建这个子类的对象,一旦创建Thread的子类对象,就相当于拥有了当前的线程对象。 创建Thread的子类对象,只是在内存中有了线程这个对象,但是线程还不能真正的去运行。 要让线程真正的在内存运行起来,必须调用start方法,这样才能够在内存开启一片新的内存空间,然后负责当前线程需要执行的任务。 我们直接通过线程对象去调用run方法,这时只是对象调用普通的方法,并没有在内存中开启新的内存空间运行任务代码。只有调用start方法才会开启新的空间。并在新的空间中自动去运行run方法。 2、开启线程的第二种方式 创建线程的另一种方法是声明实现 Runnable 接口的类。该类然后实现 run 方法。然后可以分配该类的实例,在创建 Thread 时作为一个参数来传递并启动。 实现Runnable接口的原理 1、java单继承的原因: 在Java中一个类只能有一个直接父类,如果一个类已经继承其他的父类,那么当前这个类中假如有需要多线程操作的代码,这时这类是无法再继承Thread类的。这样就会导致当前这个类中的某些需要多线程执行的任务代码就无法被线程去执行。 2、Java设计方面的原因: Thread类是专门负责描述线程本身的。Thread类可以对线程进行各种各样的操作。Java在设计的时候把线程要执行的任务交给了Thread。这样导致操作线程本身的功能和线程要执行的任务功能严重的耦合在一起。 线程的任务是需要后来的程序制定和分配的,而线程的操作是需要提前设计好的。Java就把线程的任务从Thread类中抽取出来,保存在Runnable接口中。 把任务抽取到Runnable接口中之后,在这个接口中定义线程需要执行的任务的规则,当需要明确线程的任务时,我们就让这个类实现Runnable接口,只要实现Runnable接口的类,就相当于明确了线程需要执行的任务。 当一个类实现Runnable接口,就相当于有了线程的任务,可以是还没有线程本身这个对象。这是我们就可以直接使用Thread这个类创建出线程,然后把任务交给线程。这样就达到任务和线程的分离以及结合。 软件设计的时候遵守原则:低耦合、高内聚。事物和事物之间的依赖程度称为它们的耦合度。 1.2. java同步关键词解释1.2.1. synchronized 在多个线程同时操作一个成员变量的时候,会出现多线程的安全问题,也就是某几个线程得到的变量在同一时间是相同的,导致计算的不准确。 针对多线程安全问题的原因,我们需要给出相应的处理方案,针对CPU的切换,由操作系统去控制,而我们认为是无法干预。因此这个问题解决不了。所以要解决安全问题,可以认为的控制CPU在执行某个线程操作共享数据的时候,不让其他线程进入到操作共享数据的代码中去,这样就可以保证安全。 上述的这个解决方案:称为线程的同步。 要想保证线程的安全:需要在操作共享数据的地方,加上线程的同步。 加同步格式: synchronized( 需要一个任意的对象(锁) ){ 代码块中放操作共享数据的代码。 } 上述的格式称为多线程中的同步代码块。 同步代码块上的锁,可以是随便任意的一个对象。 1.2.2. lock一.synchronized的缺陷 synchronized是java中的一个关键字,也就是说是Java语言内置的特性。那么为什么会出现Lock呢? 如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况: 1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有; 2)线程执行发生异常,此时JVM会让线程自动释放锁。 那么如果这个获取锁的线程由于要等待IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。 因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过Lock就可以办到。 再举个例子:当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。 但是采用synchronized关键字来实现同步的话,就会导致一个问题: 如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。 因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到。 另外,通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的。 总的来说,也就是说Lock提供了比synchronized更多的功能。但是要注意以下几点: 1)Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问; 2)Lock和synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。 二.java.util.concurrent.locks包下常用的类 下面我们就来探讨一下java.util.concurrent.locks包中常用的类和接口。 1.Lock 首先要说明的就是Lock,通过查看Lock的源码可知,Lock是一个接口: public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); } |
下面来逐个讲述Lock接口中每个方法的使用,lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用来获取锁的。unLock()方法是用来释放锁的。newCondition()这个方法暂且不在此讲述,会在后面的线程协作一文中讲述。 在Lock中声明了四个方法来获取锁,那么这四个方法有何区别呢? 首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。 由于在前面讲到如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock来进行同步的话,是以下面这种形式去使用的: Lock lock = ...; lock.lock(); try{ //处理任务 }catch(Exception ex){ }finally{ lock.unlock(); //释放锁 } |
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。 tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。 所以,一般情况下通过tryLock来获取锁时是这样使用的: Lock lock = ...; if(lock.tryLock()) { try{ //处理任务 }catch(Exception ex){ }finally{ lock.unlock(); //释放锁 } }else { //如果不能获取锁,则直接做其他事情 } |
lockInterruptibly()方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。 由于lockInterruptibly()的声明中抛出了异常,所以lock.lockInterruptibly()必须放在try块中或者在调用lockInterruptibly()的方法外声明抛出InterruptedException。 因此lockInterruptibly()一般的使用形式如下: public void method() throws InterruptedException { lock.lockInterruptibly(); try { //..... } finally { lock.unlock(); } } |
注意,当一个线程获取了锁之后,是不会被interrupt()方法中断的。因为本身在前面的文章中讲过单独调用interrupt()方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。 因此当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。 而用synchronized修饰的话,当一个线程处于等待某个锁的状态,是无法被中断的,只有一直等待下去。 2.ReentrantLock ReentrantLock,意思是“可重入锁”,关于可重入锁的概念在下一节讲述。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。下面通过一些实例看具体看一下如何使用ReentrantLock。 例子1,lock()的正确使用方法 public class Test { private ArrayList<Integer> arrayList = new ArrayList<Integer>(); public static void main(String[] args) { final Test test = new Test(); new Thread(){ public void run() { test.insert(Thread.currentThread()); }; }.start(); new Thread(){ public void run() { test.insert(Thread.currentThread()); }; }.start(); } public void insert(Thread thread) { Lock lock = new ReentrantLock(); //注意这个地方 lock.lock(); try { System.out.println(thread.getName()+"得到了锁"); for(int i=0;i<5;i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception }finally { System.out.println(thread.getName()+"释放了锁"); lock.unlock(); } } } |
输出结果是: Thread-0得到了锁 Thread-1得到了锁 Thread-0释放了锁 Thread-1释放了锁 也许有同学会问,怎么会输出这个结果?第二个线程怎么会在第一个线程释放锁之前得到了锁?原因在于,在insert方法中的lock变量是局部变量,每个线程执行该方法时都会保存一个副本,那么理所当然每个线程执行到lock.lock()处获取的是不同的锁,所以就不会发生冲突。 知道了原因改起来就比较容易了,只需要将lock声明为类的属性即可。 public class Test { private ArrayList<Integer> arrayList = new ArrayList<Integer>(); private Lock lock = new ReentrantLock(); //注意这个地方 public static void main(String[] args) { final Test test = new Test(); new Thread(){ public void run() { test.insert(Thread.currentThread()); }; }.start(); new Thread(){ public void run() { test.insert(Thread.currentThread()); }; }.start(); } public void insert(Thread thread) { lock.lock(); try { System.out.println(thread.getName()+"得到了锁"); for(int i=0;i<5;i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception }finally { System.out.println(thread.getName()+"释放了锁"); lock.unlock(); } } } |
这样就是正确地使用Lock的方法了。 例子2,tryLock()的使用方法 public class Test { private ArrayList<Integer> arrayList = new ArrayList<Integer>(); private Lock lock = new ReentrantLock(); //注意这个地方 public static void main(String[] args) { final Test test = new Test(); new Thread(){ public void run() { test.insert(Thread.currentThread()); }; }.start(); new Thread(){ public void run() { test.insert(Thread.currentThread()); }; }.start(); } public void insert(Thread thread) { if(lock.tryLock()) { try { System.out.println(thread.getName()+"得到了锁"); for(int i=0;i<5;i++) { arrayList.add(i); } } catch (Exception e) { // TODO: handle exception }finally { System.out.println(thread.getName()+"释放了锁"); lock.unlock(); } } else { System.out.println(thread.getName()+"获取锁失败"); } } } |
输出结果: Thread-0得到了锁 Thread-1获取锁失败 Thread-0释放了锁 例子3,lockInterruptibly()响应中断的使用方法: public class Test { private Lock lock = new ReentrantLock(); public static void main(String[] args) { Test test = new Test(); MyThread thread1 = new MyThread(test); MyThread thread2 = new MyThread(test); thread1.start(); thread2.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } thread2.interrupt(); } public void insert(Thread thread) throws InterruptedException{ lock.lockInterruptibly(); //注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出 try { System.out.println(thread.getName()+"得到了锁"); long startTime = System.currentTimeMillis(); for( ; ;) { if(System.currentTimeMillis() - startTime >= Integer.MAX_VALUE) break; //插入数据 } } finally { System.out.println(Thread.currentThread().getName()+"执行finally"); lock.unlock(); System.out.println(thread.getName()+"释放了锁"); } } } class MyThread extends Thread { private Test test = null; public MyThread(Test test) { this.test = test; } @Override public void run() { try { test.insert(Thread.currentThread()); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName()+"被中断"); } } } |
运行之后,发现thread2能够被正确中断。 3.ReadWriteLock ReadWriteLock也是一个接口,在它里面只定义了两个方法: public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading. */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing. */ Lock writeLock(); } |
一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock实现了ReadWriteLock接口。 4.ReentrantReadWriteLock ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁。 下面通过几个例子来看一下ReentrantReadWriteLock具体用法。 假如有多个线程要同时进行读操作的话,先看一下synchronized达到的效果: public class Test { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final Test test = new Test(); new Thread(){ public void run() { test.get(Thread.currentThread()); }; }.start(); new Thread(){ public void run() { test.get(Thread.currentThread()); }; }.start(); } public synchronized void get(Thread thread) { long start = System.currentTimeMillis(); while(System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName()+"正在进行读操作"); } System.out.println(thread.getName()+"读操作完毕"); } } |
这段程序的输出结果会是,直到thread1执行完读操作之后,才会打印thread2执行读操作的信息。 Thread-0正在进行读操作 Thread-0正在进行读操作 …………………… Thread-0读操作完毕 Thread-1正在进行读操作 Thread-1正在进行读操作 …………………… Thread-1读操作完毕 而改成用读写锁的话: public class Test { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final Test test = new Test(); new Thread(){ public void run() { test.get(Thread.currentThread()); }; }.start(); new Thread(){ public void run() { test.get(Thread.currentThread()); }; }.start(); } public void get(Thread thread) { rwl.readLock().lock(); try { long start = System.currentTimeMillis(); while(System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName()+"正在进行读操作"); } System.out.println(thread.getName()+"读操作完毕"); } finally { rwl.readLock().unlock(); } } } |
此时打印的结果为: Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 ………………………… Thread-0读操作完毕 Thread-1读操作完毕 说明thread1和thread2在同时进行读操作。 这样就大大提升了读操作的效率。 不过要注意的是,如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。 5.Lock和synchronized的选择 1)Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现; 2)synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁; 3)Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断; 4)通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。 5)Lock可以提高多个线程进行读操作的效率。 在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。 三.锁的相关概念介绍 在前面介绍了Lock的基本使用,这一节来介绍一下与锁相关的几个概念。 1.可重入锁 如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。 看下面这段代码就明白了: class MyClass { public synchronized void method1() { method2(); } public synchronized void method2() { } } |
上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。 而由于synchronized和Lock都具备可重入性,所以不会发生上述现象。 2.可中断锁 可中断锁:顾名思义,就是可以相应中断的锁。 在Java中,synchronized就不是可中断锁,而Lock是可中断锁。 如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。 在前面演示lockInterruptibly()的用法时已经体现了Lock的可中断性。 3.公平锁 公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。 非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。 在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。 而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。 我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性: ReentrantLock lock = new ReentrantLock(true); 如果参数为true表示为公平锁,为fasle为非公平锁。默认情况下,如果使用无参构造器,则是非公平锁。 另外在ReentrantLock类中定义了很多方法,比如: isFair() //判断锁是否是公平锁 isLocked() //判断锁是否被任何线程获取了 isHeldByCurrentThread() //判断锁是否被当前线程获取了 hasQueuedThreads() //判断是否有线程在等待该锁 在ReentrantReadWriteLock中也有类似的方法,同样也可以设置为公平锁和非公平锁。不过要记住,ReentrantReadWriteLock并未实现Lock接口,它实现的是ReadWriteLock接口。 4.读写锁 读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。 正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。 ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。 可以通过readLock()获取读锁,通过writeLock()获取写锁。 2. java并发包2.1. java并发包介绍 JDK5.0 以后的版本都引入了高级并发特性,大多数的特性在java.util.concurrent 包中,是专门用于多线并发编程的,充分利用了现代多处理器和多核心系统的功能以编写大规模并发应用程序。主要包含原子量、并发集合、同步器、可重入锁,并对线程池的构造提供 了强力的支持。 原子量:是定义了支持对单一变量执行原子操作的类。所有类都有get 和set 方法,工作方法和对volatile 变量的读取和写入一样。 并发集合:是原有集合框架的补充,为多线程并发程序提供了支持。主要有:BlockingQueue,ConcurrentMap,ConcurrentNavigableMap。 同步器:提供了一些帮助在线程间协调的类,包括semaphores,barriers,latches,exchangers 等。 可重入锁:一般同步代码依靠内部锁(隐式锁),这种锁易于使用,但是有很多局限性。新的Lock对象支持更加复杂的锁定语法。和隐式锁(利用关键字synchronized加锁)类似,每一时刻只有一个线程能够拥有Lock 对象,通过与其相关联的Condition 对象,Lock 对象也支持wait 和notify 机制。 线程池:线程完成的任务(Runnable 对象)和线程对象(Thread)之间紧密相连。适用于小型程序,在大型应用程序中,把线程管理和创建工作与应用程序的其余部分分离开更有意义。线程池封装线程管理和创建线程对象。 2.2. java并发包线程池及开源软件中的应用 一个线程池使用类 ExecutorService 的实例来表示,通过 ExecutorService 你可以提交任务,并进行调度执行。下面列举一些你可以通过 Executors 类来创建的线程池的类型: 1、 Single Thread Executor : 只有一个线程的线程池,因此所有提交的任务是顺序执行, 代码: Executors.newSingleThreadExecutor() 2、 Cached Thread Pool : 线程池里有很多线程需要同时执行,老的可用线程将被新的任务触发重新执行,如果线程超过60秒内没执行,那么将被终止并从池中删除, 代码:Executors.newCachedThreadPool() 3、 Fixed Thread Pool : 拥有固定线程数的线程池,如果没有任务执行,那么线程会一直等待, 代码: Executors.newFixedThreadPool(4) 在构造函数中的参数4是线程池的大小,你可以随意设置,也可以和cpu的数量保持一致,获取cpu的数量int cpuNums = Runtime.getRuntime().availableProcessors(); 4、 Scheduled Thread Pool : 用来调度即将执行的任务的线程池, 代码:Executors.newScheduledThreadPool() 5、 Single Thread Scheduled Pool : 只有一个线程,用来调度执行将来的任务,代码:Executors.newSingleThreadScheduledExecutor() 一旦你创建了一个线程池,你就可以往池中通过不同的方法提交执行任务,可提交 Runnable 或者 Callable 到线程池中,该方法返回一个 Future 实例表示任务的状态,如果你提交一个 Runnable ,那么如果任务完成后 Future 对象返回 null。 例如,我们编写下面的 Callable: private final class StringTask extends Callable<String>{ public String call(){ //Long operations return "Run"; } } |
如果你想使用4个线程来执行这个任务10次,那么代码如下: ExecutorService pool = Executors.newFixedThreadPool(4); for(int i = 0; i < 10; i++){ pool.submit(new StringTask()); } |
但你必须手工的关闭线程池来结束所有池中的线程: pool.shutdown(); 如果你不这么做,JVM 并不会去关闭这些线程;另外你可以使用 shutdownNow() 的方法来强制关闭线程池,那么执行中的线程也会被中断,所有尚未被执行的任务也将不会再执行。 但这个例子中,你无法获取任务的执行状态,因此我们需要借助 Future 对象: ExecutorService pool = Executors.newFixedThreadPool(4); List<Future<String>> futures = new ArrayList<Future<String>>(10); for(int i = 0; i < 10; i++){ futures.add(pool.submit(new Callable<String>(){ @Override public String call() throws Exception { System.out.println("a"); return "b"; } })); } for(Future<String> future : futures){ String result = future.get(); //Compute the result } pool.shutdown(); |
不过这段代码稍微有点复杂,而且有不足的地方。 future中若第一个是耗时任务,那么此处的get方法将是不能立即返回,即使其他线程已经执行完毕相应的任务但也是没办法得到执行结果的,除非所有线程依次执行完毕。 但是别着急,Java 为你提供了解决方案——CompletionService。 一个 CompletionService 就是一个服务,用以简化等待任务的执行结果,实现的类是 ExecutorCompletionService,该类基于 ExecutorService,因此我们可试试下面的代码: ExecutorService threadPool = Executors.newFixedThreadPool(4); CompletionService<String> pool = new ExecutorCompletionService<String>(threadPool); for(int i = 0; i < 10; i++){ pool.submit(new Callable<String>(){ @Override public String call() throws Exception { System.out.println("a"); return "b"; } }); } for(int i = 0; i < 10; i++){ String result = pool.take().get(); //Compute the result } threadPool.shutdown(); |
通过这段代码,我们可以根据执行结束的顺序获取对应的结果,而无需维护一个 Future 对象的集合。 通过 Java 为我们提供的各种工具,可以方便的进行多任务的编程,通过使用 Executors、ExecutorService 以及 CompletionService 等工具类,我们可以创建复杂的并行任务执行算法,而且可以轻松改变线程数。 2.3. java并发包消息队列及在开源软件中的应用BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。 主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。 插入: 1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常 2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false. 3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续. 读取: 4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null 5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止 其他 int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数 据可能不准 boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改 变了返回true public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true int drainTo(Collection<? super E> c); 传入的集合中的元素,如果在队列中存在,那么将 队列中的元素移动到集合中 int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,制定了移 动的数量 BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类 1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。 2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。 3、PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。 4、SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。 LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。 LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue. 生产者消费者的示例代码: 生产者: public class Producer implements Runnable { BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { String temp = "A Product, 生产线程:" + Thread.currentThread().getName(); System.out.println("I have made a product:" + Thread.currentThread().getName()); queue.put(temp);//如果队列是满的话,会阻塞当前线程 } catch (InterruptedException e) { e.printStackTrace(); } } } |
消费者: public class Consumer implements Runnable{ BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { try { String temp = queue.take();//如果队列为空,会阻塞当前线程 System.out.println(temp); } catch (InterruptedException e) { e.printStackTrace(); } } } |
测试类: import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Test3 { public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2); // BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); //不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); for (int i = 0; i < 5; i++) { new Thread(producer, "Producer" + (i + 1)).start(); new Thread(consumer, "Consumer" + (i + 1)).start(); } } } |
打印结果: Text代码 收藏代码 I have made a product:Producer1 I have made a product:Producer2 A Product, 生产线程:Producer1 A Product, 生产线程:Producer2 I have made a product:Producer3 A Product, 生产线程:Producer3 I have made a product:Producer5 I have made a product:Producer4 A Product, 生产线程:Producer5 A Product, 生产线程:Producer4 由于队列的大小限定成了2,所以最多只有两个产品被加入到队列当中,而且消费者取到产品的顺序也是按照生产的先后顺序,原因就是LinkedBlockingQueue和ArrayBlockingQueue都是按照FIFO的顺序存取元素的。 3. java JMS技术3.1. 什么是JMS JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。 JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。 3.2. JMS规范3.2.1. 专业技术规范JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。 3.2.2. 体系架构JMS由以下元素组成。 JMS提供者:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。 JMS客户:生产或消费基于消息的Java的应用程序或对象。 JMS生产者:创建并发送消息的JMS客户。 JMS消费者:接收消息的JMS客户。 JMS消息:包括可以在JMS客户之间传递的数据的对象 JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。 JMS主题:一种支持发送消息给多个订阅者的机制。 3.2.3. JMS对象模型包含如下几个要素1)连接工厂。连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。 2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。 3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。 4)JMS目的。JMS目的(Destination),又称为消息队列,是实际的消息源。 5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。 6)JMS消息通常有两种类型: ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。 ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。 file:///C:\Users\LuALu\AppData\Local\Temp\ksohtml\wpsD0DB.tmp.jpg 3.2.4. Java消息服务应用程序结构支持两种模型1、 点对点或队列模型 在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。 file:///C:\Users\LuALu\AppData\Local\Temp\ksohtml\wpsD0DC.tmp.jpg 这种模式被概括为: 只有一个消费者将获得消息 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。 每一个成功处理的消息都由接收者签收 2、发布者/订阅者模型 发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。 file:///C:\Users\LuALu\AppData\Local\Temp\ksohtml\wpsD0DD.tmp.jpg这种模式被概括为: 多个消费者可以获得消息 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。 使用Java语言,JMS提供了将应用与提供数据的传输层相分离的方式。同一组Java类可以通过JNDI中关于提供者的信息,连接不同的JMS提供者。这一组类首先使用一个连接工厂以连接到队列或主题,然后发送或发布消息。在接收端,客户接收或订阅这些消息。 3.2.5. 传递方式 JMS有两种传递消息的方式。 标记为NON_PERSISTENT(非持久)的消息最多投递一次,而标记为PERSISTENT(持久)的消息将使用暂存后再转送的机理投递。 如果一个JMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。所以默认的消息传递方式是非持久性的。即使使用非持久性消息可能降低内务和需要的存储器,并且这种传递方式只有当你不需要接收所有的消息时才使用。 JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。 · StreamMessage -- Java原始值的数据流 · MapMessage--一套名称-值对 · TextMessage--一个字符串对象 · ObjectMessage--一个序列化的 Java对象 · BytesMessage--一个未解释字节的数据流 3.2.6. 应用程序ConnectionFactory 接口(连接工厂) 用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。 Connection 接口(连接) 连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。 Destination 接口(目标) 目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。 Session 接口(会话) 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。 MessageConsumer 接口(消息消费者) 由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。 MessageProducer 接口(消息生产者) 由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。 Message 接口(消息) 是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分: 消息头(必须):包含用于识别和为消息寻找路由的操作设置。 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 消息接口非常灵活,并提供了许多方式来定制消息的内容。 3.2.7. 代码演示1.下载ActiveMQ 去官方网站下载:http://activemq.apache.org/ 2.运行ActiveMQ 解压缩apache-activemq-5.5.1-bin.zip, 修改配置文件activeMQ.xml,将0.0.0.0修改为localhost <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616"/> <transportConnector name="ssl" uri="ssl://localhost:61617"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> <transportConnector uri="http://localhost:8081"/> <transportConnector uri="udp://localhost:61618"/> |
然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。 启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。 3.运行代码 3.2.8. 常用的JMS实现要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。既有开源的提供者也有专有的提供者。 开源的提供者包括: Apache ActiveMQ JBoss 社区所研发的 HornetQ Joram Coridan的MantaRay The OpenJMS Group的OpenJMS 专有的提供者包括: BEA的BEA WebLogic Server JMS TIBCO Software的EMS GigaSpaces Technologies的GigaSpaces Softwired 2006的iBus IONA Technologies的IONA JMS SeeBeyond的IQManager(2005年8月被Sun Microsystems并购) webMethods的JMS+ - my-channels的Nirvana Sonic Software的SonicMQ SwiftMQ的SwiftMQ IBM的WebSphere MQ
|