并发源码|其他并发容器

CopyOnWriteArrayList

写时复制的并发Arraylist容器,底层使用锁来实现

底层使用volatile标记array,保证并发修改后能立即可见。

另外使用ReentrantLock在进行写操作时加锁

1
2
3
4
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

所有的读操作(get,迭代器,contains)都是读的array快照,弱一致性模型

优点:线程安全,读写不互斥

缺点:若一致性,每次修改都要新建一个数组。空间浪费大,增加gc压力

ConcurrentLinkedQueue

无界队列,底层使用单向链表实现,无大小限制。所有的指针操作都是基于CAS,没成功则进入下次循环直到成功。

不停的往此队列插入数据,可能会导致内存溢出。

此容器使用CAS保证并发安全,但size是弱一致性的

LinkedBlockingQueue

有界队列,底层使用单向链表实现,有大小限制,超过了限制,往队列里插入数据就会阻塞住。可以限制内存队列的大小,避免内存无限制的增长,最后撑爆内存

底层使用2个锁提高并发能力:take锁与put锁,以及各自的Condition。如下图所示

take的时候,若为空则阻塞等待,容器非空时再take数据

put的时候,若满则阻塞等待,容器不满再put数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
	/** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

ArrayBlockingQueue

基于环形数组实现的有界队列

底层使用一个锁,2个condition。其余与LinkedBlockingQueue的原理大致一致

Talk is cheap, show me the bug/code.
使用 Hugo 构建
主题 StackJimmy 设计