2.2 变量定义
final transient ReentrantLock lock = new ReentrantLock();
private transient volatile Object[] array;
final Object[] getArray() {
return array;
}
final void setArray(Object[] a) {
array = a;
}
定义了一个重入锁,这个锁是用在写操作上的,保证同步,同一时刻只有一个线程在写,而读是没有加锁的,读的时候读的还是旧的那个数组,而写的时候会拷贝一个数组副本,写完之后重新赋值给旧的数组。所以CopyOnWriteArrayList是写写互斥,读写不互斥,读读不互斥。
2.4 读方法
public int size() {
return getArray().length;
}
public boolean isEmpty() {
return size() == 0;
}
private static int indexOf(Object o, Object[] elements,
int index, int fence) {
if (o == null) {
for (int i = index; i < fence; i++)
if (elements == null)
return i;
} else {
for (int i = index; i < fence; i++)
if (o.equals(elements))
return i;
}
return -1;
}
private static int lastIndexOf(Object o, Object[] elements, int index) {
if (o == null) {
for (int i = index; i >= 0; i--)
if (elements == null)
return i;
} else {
for (int i = index; i >= 0; i--)
if (o.equals(elements))
return i;
}
return -1;
}
public boolean contains(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length) >= 0;
}
public int indexOf(Object o) {
Object[] elements = getArray();
return indexOf(o, elements, 0, elements.length);
}
public int indexOf(E e, int index) {
Object[] elements = getArray();
return indexOf(e, elements, index, elements.length);
}
public int lastIndexOf(Object o) {
Object[] elements = getArray();
return lastIndexOf(o, elements, elements.length - 1);
}
public int lastIndexOf(E e, int index) {
Object[] elements = getArray();
return lastIndexOf(e, elements, index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
public E get(int index) {
return get(getArray(), index);
}
可以看到,所有的读方法是没有加锁的,而且底层采用数组实现,所以读的效率是很高的。
2.5 写方法
/**
* 更新指定位置的元素
*/
public E set(int index, E element) {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 先获取旧数组
Object[] elements = getArray();
// 获取旧的元素
E oldValue = get(elements, index);
// 如果旧的元素和新设置的元素不相等
if (oldValue != element) {
int len = elements.length;
// 先拷贝一个和旧数组同样大小的数组
Object[] newElements = Arrays.copyOf(elements, len);
// 设置新值
newElements[index] = element;
// 更新数组引用
setArray(newElements);
} else {
setArray(elements);
}
// 返回旧值
return oldValue;
} finally {
lock.unlock();
}
}
/**
* 添加一个元素
*/
public boolean add(E e) {
// 先加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 新数组的大小为旧数组大小 + 1
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// 更新数组引用
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
/**
* 添加一个元素到指定位置
*/
public void add(int index, E element) {
// 先加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 如果找不到这个索引位置则抛出异常
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;
int numMoved = len - index;
// 如果不需要移动数组,则将该元素插入到数组尾部,数组得先扩容1
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
// 否则新创建一个数组,并复制旧数据
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
newElements[index] = element;
// 更新数组引用
setArray(newElements);
} finally {
lock.unlock();
}
}
/**
* 移除指定位置的元素
*/
public E remove(int index) {
// 先加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
// 如果不需要移动数组,则直接截掉最后一个元素生成一个新的数组
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
// 否则创建新数组,拷贝旧元素
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
// 更新数组引用
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
可以看到,它所有的写操作都是生成新的数组,新增元素扩容是旧数组长度+1,在写的时候不会去影响旧的数组。而且所有的写操作都加锁了,保证同一个时刻只有一个线程修改元素。如果写的时候,有线程来读,读的是写之前的旧数组,所CopyOnWrite只能保证数据最终的一致性,不能保证数据的实时一致性。
2.6 排序
public void sort(Comparator<? super E> c) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
Object[] newElements = Arrays.copyOf(elements, elements.length);
@SuppressWarnings("unchecked") E[] es = (E[])newElements;
Arrays.sort(es, c);
setArray(newElements);
} finally {
lock.unlock();
}
}
CopyOnWriteArrayList支持Comparator排序的,先加锁,然后拷贝一个新的数组,再调用Arrays.sort方法排序,最后更新数组的引用。
2.7 迭代遍历
public void forEach(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
Object[] elements = getArray();
int len = elements.length;
for (int i = 0; i < len; ++i) {
@SuppressWarnings("unchecked") E e = (E) elements;
action.accept(e);
}
}
可以看到,如果是使用增强for循环遍历CopyOnWriteArrayList元素的话,它是直接拿到当前数组去操作遍历的,而这期间的写操作又不会影响这个数组。所以在迭代器使用的整个生命周期中,其内部数据不会被改变;并且集合在遍历过程中进行修改,也不会抛出ConcurrentModificationException。
再看看它的迭代器:
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
public ListIterator<E> listIterator() {
return new COWIterator<E>(getArray(), 0);
}
static final class COWIterator<E> implements ListIterator<E> {
private final Object[] snapshot;
private int cursor;
public boolean hasNext() {
return cursor < snapshot.length;
}
public boolean hasPrevious() {
return cursor > 0;
}
@SuppressWarnings("unchecked")
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
@SuppressWarnings("unchecked")
public E previous() {
if (! hasPrevious())
throw new NoSuchElementException();
return (E) snapshot[--cursor];
}
public int nextIndex() {
return cursor;
}
public int previousIndex() {
return cursor-1;
}
public void remove() {
throw new UnsupportedOperationException();
}
public void set(E e) {
throw new UnsupportedOperationException();
}
public void add(E e) {
throw new UnsupportedOperationException();
}
@Override
public void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
Object[] elements = snapshot;
final int size = elements.length;
for (int i = cursor; i < size; i++) {
@SuppressWarnings("unchecked") E e = (E) elements;
action.accept(e);
}
cursor = size;
}
}
很明显,迭代器使用的是当前数组的快照,就是它本身。内部的add,set,remove操作直接抛异常,就是为了让调用者不能在迭代期间修改元素。
public static void main(String[] args) {
final List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Thread t = new Thread(new Runnable() {
int count = -1;
@Override
public void run() {
while (true) {
list.add(String.valueOf(count++));
}
}
});
t.setDaemon(true);
t.start();
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (String s : list) {
System.out.println(s);
}
}
执行直接报错:
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at io.kzw.advance.csdn_blog.ConcurrentTest.main(ConcurrentTest.java:105)
我们把ArrayList换成CopyOnWriteArrayList:
public static void main(String[] args) {
final List<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Thread t = new Thread(new Runnable() {
int count = -1;
@Override
public void run() {
while (true) {
list.add(String.valueOf(count++));
}
}
});
t.setDaemon(true);
t.start();
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (String s : list) {
System.out.println(s);
}
}
执行OK,正常打印。
再来试试迭代器:
public static void main(String[] args) {
final List<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Thread t = new Thread(new Runnable() {
int count = -1;
@Override
public void run() {
while (true) {
list.add(String.valueOf(count++));
}
}
});
t.setDaemon(true);
t.start();
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
String next = iterator.next();
System.out.println(next);
}
}
可以正常打印输出。
但是如果对迭代器操作add,set或者remove呢?
public static void main(String[] args) {
final List<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.add("b");
list.add("c");
Thread t = new Thread(new Runnable() {
int count = -1;
@Override
public void run() {
while (true) {
list.add(String.valueOf(count++));
}
}
});
t.setDaemon(true);
t.start();
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
String next = iterator.next();
System.out.println(next);
if (next.equals("100")) {
iterator.remove();
}
}
}
可以看到,中间报错了:
Exception in thread "main" java.lang.UnsupportedOperationException