并发源码|其他并发容器
CopyOnWriteArrayList
写时复制的并发Arraylist容器,底层使用锁来实现
底层使用volatile标记array,保证并发修改后能立即可见。
另外使用ReentrantLock在进行写操作时加锁
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
所有的读操作(get,迭代器,contains)都是读的array快照,弱一致性模型
优点:线程安全,读写不互斥
缺点:若一致性,每次修改都要新建一个数组。空间浪费大,增加gc压力
ConcurrentLinkedQueue
无界队列,底层使用单向链表实现,无大小限制。所有的指针操作都是基于CAS,没成功则进入下次循环直到成功。
不停的往此队列插入数据,可能会导致内存溢出。
此容器使用CAS保证并发安全,但size是弱一致性的
LinkedBlockingQueue
有界队列,底层使用单向链表实现,有大小限制,超过了限制,往队列里插入数据就会阻塞住。可以限制内存队列的大小,避免内存无限制的增长,最后撑爆内存
底层使用2个锁提高并发能力:take锁与put锁,以及各自的Condition。如下图所示
take的时候,若为空则阻塞等待,容器非空时再take数据
put的时候,若满则阻塞等待,容器不满再put数据
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
ArrayBlockingQueue
基于环形数组实现的有界队列
底层使用一个锁,2个condition。其余与LinkedBlockingQueue的原理大致一致
- 原文作者:
- 原文链接:https://leyou240.github.io/post/juc08_%E5%85%B6%E4%BB%96%E5%B9%B6%E5%8F%91%E5%AE%B9%E5%99%A8/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。