今天更新的主题是依然是java中的锁机制:第二章(大纲5-8)。
大纲:
1. 并发的特性
2. 锁的分类
3. synchronized
4. volatile
5. Lock
6. ThreadLocal
7. Atmoic
8. Semaphore
9. 阻塞队列
10. 死锁
11. CountdownLatch
12.CyclicBarrier
5. Lock
5.1 既然有synchronized,为什么JDK还会提供Lock呢?
上一章我们了解到:如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况。
(1) 获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
(2) 线程执行发生异常,此时JVM会让线程自动释放锁。
那么如果这个获取锁的线程由于要等待IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。
因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过Lock就可以办到。
还有一种情况:
当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。但是采用synchronized关键字来实现同步的话,就会导致一个问题:如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。
因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到(共享锁)。
通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的。
最后Lock的实现类基本都支持非公平锁(默认)和公平锁,synchronized只支持非公平锁,当然,在大部分情况下,非公平锁是高效的选择。
// -------------------------- ReentrantLock ---------------------------
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// -------------------------- ReentrantReadWriteLock ---------------------------
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
5.2 Lock锁方法
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
lock():
lock方法是我们平时用的比较多的,它的作用就是用来获取锁。如果锁已被其他线程获取,则进行等待。由于在前面讲到如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。
Lock lock = ...;
lock.lock();
try {
// 处理任务
} catch(Exception ex) {
} finally {
// 释放锁
lock.unlock();
}
void lockInterruptibly() throws InterruptedException:
lockInterruptibly()方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。举个例子:当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。如果B已经拿到锁了,调用interrupt()是不能中断的,只能中断正在等待锁的线程,让其放弃等待。
public void method() throws InterruptedException {
lock.lockInterruptibly();
try {
// 处理任务
}
finally {
lock.unlock();
}
}
boolean tryLock():
tryLock方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回,在拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit):
tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。如果在等待锁的期间调用了interrupt()方法,则会抛出InterruptedException。
Lock lock = ...;
if (lock.tryLock()) {
try {
// 处理任务
} catch(Exception ex) {
} finally {
// 释放锁
lock.unlock();
}
} else {
//如果不能获取锁,则直接做其他事情
}
void unlock():
unlock()这个方法就是手动释放锁,Lock需要编码时手动释放锁,否则会造成死锁。
Condition newCondition():
newCondition()返回Condition对象,这个类的作用和Object的wait(), notify(), notifyAll()相似,而且还提供了超时,deadline,不可中断等函数,后面的例子会介绍这些方法的使用。
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
5.3 ReentrantLock
ReentrantLock,意思是"可重入锁",这个概念在上一章有讲过。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。
来看一个例子:
private static final class TestReentrantLock {
private int count;
private Lock lock = new ReentrantLock();
public void increase() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get lock");
count++;
System.out.println("count = " + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " release lock");
lock.unlock();
}
}
}
public static void main(String[] args) {
TestReentrantLock testLock = new TestReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
testLock.increase();
}
}, "thread - " + i).start();
}
}
执行输出:
thread - 0 get lock
count = 1
thread - 0 release lock
thread - 1 get lock
count = 2
thread - 2 get lock
thread - 1 release lock
count = 3
thread - 2 release lock
试试tryLock():
private static final class TestReentrantLock {
private int count;
private Lock lock = new ReentrantLock();
public void increase() {
if (lock.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + " get lock");
count++;
System.out.println("count = " + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " release lock");
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " get lock fail");
}
}
}
public static void main(String[] args) {
TestReentrantLock testLock = new TestReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
testLock.increase();
}
}, "thread - " + i).start();
}
}
执行输出:
thread - 0 get lock
thread - 2 get lock fail
thread - 1 get lock fail
count = 1
thread - 0 release lock
因为thread - 0拿到了锁,所以这时其他线程去试图tryLock()拿锁,会立即返回失败。
试试tryLock(long time, TimeUnit unit):
private static final class TestReentrantLock {
private int count;
private Lock lock = new ReentrantLock();
public void increase() {
try {
if (lock.tryLock(3, TimeUnit.MILLISECONDS)) {
try {
System.out.println(Thread.currentThread().getName() + " get lock");
count++;
System.out.println("count = " + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " release lock");
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " get lock fail");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " get lock interrupted");
}
}
}
public static void main(String[] args) {
TestReentrantLock testLock = new TestReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
testLock.increase();
}
}, "thread - " + i).start();
}
}
执行输出:
thread - 2 get lock
count = 1
thread - 2 release lock
thread - 1 get lock
count = 2
thread - 1 release lock
thread - 0 get lock
count = 3
thread - 0 release lock
可以看出,线程在等待拿锁的时候,有3s的时间,如果超时则会获取锁失败。如果该线程在等待锁的期间调用了interrupt()方法,则会抛出InterruptedException.
private static final class TestReentrantLock {
private int count;
private Lock lock = new ReentrantLock();
public void increase() {
try {
if (lock.tryLock(3, TimeUnit.MILLISECONDS)) {
try {
System.out.println(Thread.currentThread().getName() + " get lock");
count++;
System.out.println("count = " + count);
// 让线程1的持锁时间长一点
if (Thread.currentThread().getName().equals("thread - 1")) {
// sleep不会释放锁
Thread.sleep(3000L);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " release lock");
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " get lock fail");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " get lock interrupted");
}
}
}
public static void main(String[] args) {
TestReentrantLock testLock = new TestReentrantLock();
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
testLock.increase();
}
}, "thread - " + i);
thread.start();
if (i == 2) {
thread.interrupt();
}
}
}
执行输出:
thread - 2 get lock interrupted
thread - 1 get lock
count = 1
thread - 0 get lock fail
thread - 1 release lock
试试void lockInterruptibly() throws InterruptedException:
private static final class TestReentrantLock {
private int count;
private Lock lock = new ReentrantLock();
public void increase() {
try {
lock.lockInterruptibly();
try {
System.out.println(Thread.currentThread().getName() + " get lock");
count++;
System.out.println("count = " + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " release lock");
lock.unlock();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " get lock interrupted");
}
}
}
public static void main(String[] args) {
TestReentrantLock testLock = new TestReentrantLock();
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
testLock.increase();
}
}, "thread - " + i);
thread.start();
if (i == 2) {
thread.interrupt();
}
}
}
执行输出:
thread - 0 get lock
thread - 2 get lock interrupted
count = 1
thread - 0 release lock
thread - 1 get lock
count = 2
thread - 1 release lock
可以看到线程2在等待锁的过程中被中断了。
最后再来看一下Condition的例子,实现一个阻塞队列:
private static class MyBlocingQueue<E> {
private final List<E> list;
// 有大小限制的
private final int limit;
MyBlocingQueue(int limit) {
list = new LinkedList<>();
this.limit = limit;
}
// 用wait,notify写的, 在list空或者满的时候效率会高一点,因为wait释放锁,然后等待唤醒
private synchronized void put(E e) {
// wait必须在同步方法或者块里,然后使用while,里面wait(),在下面调用notifyAll
while (list.size() == limit) {
try {
wait();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
System.out.println("list : " + list.toString());
System.out.println("put : " + e);
list.add(e);
notifyAll();
}
private synchronized E take() {
// wait必须在同步方法或者块里,然后使用while,里面wait(),在下面调用notifyAll
while (list.size() == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("list : " + list.toString());
E remove = list.remove(0);
System.out.println("take : " + remove);
notifyAll();
return remove;
}
// Lock实现
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private void putNew(E e) {
lock.lock();
try {
while (list.size() == limit) {
try {
notFull.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
list.add(e);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
private E takeNew() {
lock.lock();
try {
while (list.size() == 0) {
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
E remove = list.remove(0);
notFull.signalAll();
return remove;
} finally {
lock.unlock();
}
}
static void test() {
final MyBlocingQueue<Integer> myBlocingQueue = new MyBlocingQueue(10);
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
exec.execute(() -> {
Random random = new Random();
int r = random.nextInt(100);
// 生成随机数,按照一定比率读取或者放入!
if (r < 30) {
myBlocingQueue.put(r);
} else {
myBlocingQueue.take();
}
});
}
exec.shutdown();
}
}
5.4 ReadWriteLock
读写锁,上面一章我们也提过,读写锁中的读锁是共享锁,写锁是独享锁,在读多写少的场景下,读写锁很适用,比如做缓存系统。
ReadWriteLock是一个接口,在它里面只定义了两个方法:
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}
readLock()和writeLock()用来获取读锁和写锁,读锁是共享的(可以让多个线程同时持有,提高读的效率),而写锁是独享的。
基本规则:读读不互斥 读写互斥 写写互斥。
读写锁的实现原理(自己实现读写锁,方便理解):
public class ReadWriteLock {
/**
* 读锁持有个数
*/
private int readCount = 0;
/**
* 写锁持有个数
*/
private int writeCount = 0;
/**
* 获取读锁,读锁在写锁不存在的时候才能获取
*/
public synchronized void lockRead() throws InterruptedException {
// 写锁存在,需要wait
while (writeCount > 0) {
wait();
}
readCount++;
}
/**
* 释放读锁
*/
public synchronized void unlockRead() {
readCount--;
notifyAll();
}
/**
* 获取写锁,当读锁存在时需要wait.
*/
public synchronized void lockWrite() throws InterruptedException {
// 先判断是否有写请求
while (writeCount > 0) {
wait();
}
// 此时已经不存在获取写锁的线程了,因此占坑,防止写锁饥饿
writeCount++;
// 读锁为0时获取写锁
while (readCount > 0) {
wait();
}
}
/**
* 释放读锁
*/
public synchronized void unlockWrite() {
writeCount--;
notifyAll();
}
}
看上面的代码就能理解:
写锁独享,读锁共享。当去拿写锁时,先判断有没有别的线程在写(写写互斥),有的话就等待,直到没有别的线程在写,再判断有没有线程在读(读写互斥),有的话再等待,最后才能拿到写锁。而拿读锁时,先判断有没有别的线程在写(读写互斥),有的话等待,直到没有别的线程在写,就可以拿到读锁,进行读取,不用判断有没有别的线程在读(读读不互斥)。
看看读写锁的使用例子:
private static class ReaderAndWriter<K, V> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
private final Map<K, V> map;
ReaderAndWriter() {
map = new HashMap<>();
}
// --------------------- 这是用lock()方法写的 ---------------------
V put(K key, V value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
V get(K key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
// --------------------- 这是用tryLock()方法写的 ---------------------
V putTrylock(K key, V value) {
// tryLock得放到while循环里,不断地尝试
while (true) {
writeLock.tryLock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
}
V getTrylock(K key) {
// tryLock得放到while循环里,不断地尝试
while (true) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
}
static void test() {
final ReaderAndWriter<String, Integer> cache = new ReaderAndWriter<>();
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
exec.execute(() -> {
Random random = new Random();
int r = random.nextInt(100);
// 生成随机数,小于30的写入缓存,大于等于30则读取数字
if (r < 30) {
cache.put("x", r);
} else {
cache.get("x");
}
});
}
exec.shutdown();
}
}
拓展下,如果写多读少应该选择什么锁呢?
一般情况下独占锁的效率低来源于高并发下对临界区的激烈竞争导致线程上下文切换,因此当并发不是很高的情况下,读写锁由于需要额外维护读锁的状态,可能还不如独占锁的效率高,因此需要根据实际情况选择使用。
而JDK6之前,在多线程环境并存在大量竞争的情况下,synchronized的的性能比Lock差,而JDK1.6之后,synchronized也改用CAS来实现了,所以两者性能差不多。
单从性能上来考虑,写多读少:在高并发下,使用就行ReentantLock,低并发下使用synchronized。
Tips:
Lock和synchronized的选择
总结来说,Lock和synchronized有以下几点不同:
(1) Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
(2) synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
(3) Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
(4) 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
(5) Lock可以提高多个线程进行读操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
6. ThreadLocal
6.1 ThreadLocal简介
ThreadLocal用于保存某个线程共享变量:对于同一个static/not static ThreadLocal,不同线程只能从中get,set,remove自己的变量,而不会影响其他线程的变量。
ThreadLocal.get: 获取ThreadLocal中当前线程共享变量的值。
ThreadLocal.set: 设置ThreadLocal中当前线程共享变量的值。
ThreadLocal.remove: 移除ThreadLocal中当前线程共享变量的值。
ThreadLocal.initialValue: ThreadLocal没有被当前线程赋值时或当前线程刚调用remove方法后调用get方法,返回此方法值。
看个例子:
private static final class TestThreadLocal {
private static ThreadLocal<Object> seqNum = new ThreadLocal<Object>() {
public Integer initialValue() {
return 0;
}
};
public int getNextNum() {
seqNum.set((Integer) seqNum.get() + 1);
return (Integer) seqNum.get();
}
}
public static void main(String[] args) {
TestThreadLocal testThreadLocal = new TestThreadLocal();
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 3; j++) {
System.out.println(Thread.currentThread().getName() + " getNextNum = " + testThreadLocal.getNextNum());
}
}
}, "thread - " + i).start();
}
}
执行输出:
thread - 0 getNextNum = 1
thread - 0 getNextNum = 2
thread - 0 getNextNum = 3
thread - 1 getNextNum = 1
thread - 1 getNextNum = 2
thread - 1 getNextNum = 3
thread - 2 getNextNum = 1
thread - 2 getNextNum = 2
thread - 2 getNextNum = 3
thread - 3 getNextNum = 1
thread - 4 getNextNum = 1
thread - 3 getNextNum = 2
thread - 4 getNextNum = 2
thread - 3 getNextNum = 3
thread - 4 getNextNum = 3
可以看到,虽然几个线程共享一个ThreadLocal对象,但是存储的东西却互不干扰。
6.2 ThreadLocal使用场景
线程不安全的类在多线程中使用,如 SimpleDateFormat,在一个线程中修改不影响其他线程的使用;
可以在同一个线程之中传递数据,比如需要写处理事务的sql语句,你可以选择把一个Connection放在ThreadLocal里面,在service层取出来使用。
6.3 ThreadLocal原理
看一下ThreadLocal的源码:
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
private static final int INITIAL_CAPACITY = 16;
private Entry[] table;
private int size = 0;
private int threshold; // Default to 0
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
...
}
再看看Thread中的threadLocals:
ThreadLocal.ThreadLocalMap threadLocals = null;
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/* Aggressively null out all reference fields: see bug 4006245 */
target = null;
/* Speed the release of some of these resources */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}
在线程退出的时候会销毁threadLocals。
可以看出每个thread实例都有一个ThreadLocalMap。在上图中的一个Thread的这个ThreadLocalMap中分别存放了3个Entry,默认一个ThreadLocalMap初始化了16个Entry,每一个Entry对象存放的是一个ThreadLocal变量对象。
再简单一点的说就是:一个Thread中只有一个ThreadLocalMap,一个ThreadLocalMap中可以有多个ThreadLocal对象,其中一个ThreadLocal对象对应一个ThreadLocalMap中的一个Entry(也就是说:一个Thread可以依附有多个ThreadLocal对象)。
这种存储结构的好处:
(1) 线程死去的时候,线程共享变量ThreadLocalMap则销毁。
(2) ThreadLocalMap<ThreadLocal, Object>键值对数量为ThreadLocal的数量,一般来说ThreadLocal数量很少,相比在ThreadLocal中用Map<Thread, Object>键值对存储线程共享变量(Thread数量一般来说比ThreadLocal数量多),性能提高很多。
6.4 ThreadLocalMap<ThreadLocal, Object>弱引用问题
当线程没有结束,但是ThreadLocal已经被回收,则可能导致线程中存在ThreadLocalMap<null, Object>的键值对,造成内存泄露。(ThreadLocal被回收,ThreadLocal关联的线程共享变量还存在)。
虽然ThreadLocal的get,set方法可以清除ThreadLocalMap中key为null的value,但是get,set方法在内存泄露后并不会必然调用,所以为了防止此类情况的出现,我们有两种手段。
(1) 使用完线程共享变量后,显示调用ThreadLocalMap.remove方法清除线程共享变量;
(2) JDK建议ThreadLocal定义为private static,这样ThreadLocal的弱引用问题则不存在了。
7. Atmoic
无锁即无障碍的运行,所有线程都可以到达临界区,接近于无等待。无锁采用CAS(compare and swap)算法来处理线程冲突, 其原理如下(摘自维基百科):
CAS包含3个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即时没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
CPU指令
另外,虽然上述步骤繁多,实际上CAS整一个操作过程是一个原子操作,它是由一条CPU指令完成的,从指令层保证操作可靠,不会被多线程干扰。
无锁与volatile
无锁可以通过CAS来保证原子性与线程安全,它与volatile什么区别呢?
当给变量加了volatile关键字,表示该变量对所有线程可见,但不保证原子性。
以volatile i, i++为例,分为以下四步:
加载i
对i进行+1
回写i的值
用内存屏障通知其他线程i的值
其中前三步是线程不安全的, 可能其他线程会对i进行读写。
因此任何依赖于之前值的操作,如i++,i = i *10使用volatile都不安全。
而诸如get/set, boolean这类可以使用volatile。
这边我们以AtomicInteger为例切入。
7.1 AtomicInteger
主要接口:
// 取得当前值
public final int get()
// 设置当前值
public final void set(int newValue)
// 设置新值,并返回旧值
public final int getAndSet(int newValue)
// 如果当前值为expect,则设置为u
public final boolean compareAndSet(int expect, int u)
// 当前值加1,返回旧值
public final int getAndIncrement()
// 当前值减1,返回旧值
public final int getAndDecrement()
// 当前值增加delta,返回旧值
public final int getAndAdd(int delta)
// 当前值加1,返回新值
public final int incrementAndGet()
// 当前值减1,返回新值
public final int decrementAndGet()
// 当前值增加delta,返回新值
public final int addAndGet(int delta)
源码实现:
// 封装了一个int对其加减
private volatile int value;
.......
public final boolean compareAndSet(int expect, int update) {
// 通过unsafe 基于CPU的CAS指令来实现, 可以认为无阻塞.
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
.......
public final int getAndIncrement() {
for (;;) {
// 当前值
int current = get();
// 预期值
int next = current + 1;
if (compareAndSet(current, next)) {
// 如果加成功了, 则返回当前值
return current;
}
// 如果加失败了, 说明其他线程已经修改了数据, 与期望不相符,
// 则继续无限循环, 直到成功. 这种乐观锁, 理论上只要等两三个时钟周期就可以设值成功
// 相比于直接通过synchronized独占锁的方式操作int, 要大大节约等待时间.
}
}
还是看下上章那个i++的例子,我们可以用synchronized和Lock来处理,也可以用无锁(乐观锁)的方式:
private static final class TestAtomic {
private AtomicInteger i = new AtomicInteger(0);
public void increase() {
i.incrementAndGet();
}
}
public static void main(String[] args) {
final TestAtomic testAtomic = new TestAtomic();
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
testAtomic.increase();
System.out.println(testAtomic.i.get());
}
}, "thread - " + i).start();
}
}
无论执行多少次,结果都是1-5(无重复)。
7.2 Unsafe
Unsafe类是在sun.misc包下,可以用于一些非安全的操作,比如:
根据偏移量设置值,线程park(),底层的CAS操作等等。
主要方法:
// 获得给定对象偏移量上的int值
public native int getInt(Object o, long offset);
// 设置给定对象偏移量上的int值
public native void putInt(Object o, long offset, int x);
// 获得字段在对象中的偏移量
public native long objectFieldOffset(Field f);
// 设置给定对象的int值,使用volatile语义
public native void putIntVolatile(Object o, long offset, int x);
// 获得给定对象对象的int值,使用volatile语义
public native int getIntVolatile(Object o, long offset);
// 和putIntVolatile()一样,但是它要求被操作字段就是volatile类型的
public native void putOrderedInt(Object o, long offset, int x);
7.3 AtomicIntegerFieldUpdater
这个类,我在看JDK源码中也经常看到,它的作用是让普通变量也享受原子操作。
先看个例子:
public static class TestAtomic {
// 如果直接把int改成AtomicInteger, 可能对代码破坏比较大
// 因此使用AtomicIntegerFieldUpdater对count进行封装
volatile int count;
}
// 通过反射实现
public final static AtomicIntegerFieldUpdater<TestAtomic> updater = AtomicIntegerFieldUpdater.newUpdater(TestAtomic.class, "count");
// 检查Updater是否工作正确, allCount的结果应该跟count一致
public static AtomicInteger allCount = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
final TestAtomic testAtomic = new TestAtomic();
Thread[] t = new Thread[1000];
for (int i = 0; i < 1000; i++) {
t = new Thread() {
public void run() {
if (Math.random() > 0.4) {
updater.incrementAndGet(testAtomic);
allCount.incrementAndGet();
}
}
};
t.start();
}
for (int i = 0; i < 1000; i++) {
t.join();
}
System.out.println("count=" + testAtomic.count);
System.out.println("allCount=" + allCount.get());
}
执行输出:count和allCount最终结果一致。
Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量,如果变量不可见,就会出错。比如count申明为private,就是不可行的。
为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未申明这个类型,那么简单得申明一下就行,这不会引起什么问题。
由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此它不支持static字段(Unsafe.objectFieldOffset()不支持静态变量)。
Tips:
synchronized:
在资源竞争不是很激烈的情况下,偶尔会有同步的情形下,synchronized是很合适的。原因在于,编译程序通常会尽可能的进行优化synchronized,另外可读性非常好,不管用没用过5.0多线程包的程序员都能理解。
ReentrantLock:
ReentrantLock提供了多样化的同步,比如有时间限制的同步,可以被Interrupt的同步(synchronized的同步是不能Interrupt的)等。在资源竞争不激烈的情形下,性能稍微比synchronized差点点。但是当同步非常激烈的时候,synchronized的性能一下子能下降好几十倍,而ReentrantLock确还能维持常态。
Atomic:
和上面的类似,不激烈情况下,性能比synchronized略逊,而激烈的时候,也能维持常态。激烈的时候,Atomic的性能会优于ReentrantLock一倍左右。但是其有一个缺点,就是只能同步一个值,一段代码中只能出现一个Atomic的变量,多于一个同步无效,因为它不能在多个Atomic之间同步。
8. Semaphore
Semaphore类是一个计数信号量,必须由获取它的线程释放,通常用于限制可以访问某些资源线程数目。
一个信号量有且仅有3种操作,且它们全部是原子的:初始化、增加和减少。
看看它的方法:
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)
// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)
看一个例子:
public static void main(String[] args) {
final Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
final int count = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(1);
System.out.println("我在运行 count = " + count);
Thread.sleep(2000);
semaphore.release(2);
} catch (Exception e) {
System.out.println(e);
}
}
}).start();
}
}
执行结果:2个2个打印输出。
Tips:
在很多情况下,可能有多个线程需要访问数目很少的资源。假想在服务器上运行着若干个回答客户端请求的线程。这些线程需要连接到同一数据库,但任一时刻只能获得一定数目的数据库连接。你要怎样才能够有效地将这些固定数目的数据库连接分配给大量的线程?
1. 给方法加同步锁,保证同一时刻只能有一个人去调用此方法,其他所有线程排队等待,但是此种情况下即使你的数据库链接有10个,也始终只有一个处于使用状态。这样将会大大的浪费系统资源,而且系统的运行效率非常的低下。
2. 另外一种方法当然是使用信号量,通过信号量许可与数据库可用连接数相同的数目,将大大的提高效率和性能。
---------------------
作者:况众文
来源:CSDN
原文:https://blog.csdn.net/u014294681/article/details/85255416
版权声明:本文为博主原创文章,转载请附上博文链接!
|
|