分类 juc源码 中的文章

并发源码| JMM与volatile

主内存的数据会被加载到cpu本地缓存里去,cpu后面会读写自己的缓存 缓存模型下的并发问题 java内存模型 read(从主存读取),load(将主存读取到的值写入工作内存),use(从工作内存读取数据来计算),assign(将计算好的值重新赋值到工作内存中),store(将工作内存数据写入主存),write(将store过去的变量值赋值给主存中的变量) volatile保持内存可见的原理 就是说一定会强制保证说assign之后,就立马执行store + write,刷回到主内存里去。并且将其他工作内存中的状态改为过期 volatile无法保持原子性的原理 2个线程同时use后,线程1assign了以后,刷回了主内存。但线程2已经use了,不需要在从内存中加载。此时使用的旧的值。 底层使用cpu的MESI缓存一致性协议,强制刷主内存,过期其他线程中的工作内存 volatile内存屏障及happen before原则 在被volatile标记的字段读写前后都设置内存屏障,防止指令重排。 指令重排的happen before原则,是给开发JVM的人看的。 大概有几个原则 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作 lock规则:一个unLock操作先行发生于后面对同一个锁的lock操作 volatile变量规则:必须保证是先写,再读 volatile的实际用途和场景 double check的单例模式,使用volatile防止指令重排 instance=new Singleton() 分为3步 1、volatile保证并发可见性 2、synchronized保证有序性 3、double check保证对象被正常创建(类加载创建过程多线程问题) public class Singleton { private static volatile Singleton instance; private Singleton() { } public Singleton getInstance(){ if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance=new Singleton(); } } } return instance; } } ……

阅读全文

并发源码|AQS、ReentratLock原理

AQS原理 底层使用如下组件 private volatile int state; //标识同步状态,第一次加锁时设置为1,若重入加锁,则state递增。释放锁时state递减,若state=0时将owner设置为null,并且使用LockSupport.unpark(s.thread);方法唤醒等待队列中的第一个线程 //其他线程来获取锁时,若失败,则加入等待队列(双向链表),并且使用LockSupport.park(this);休眠此线程 private transient Thread exclusiveOwnerThread; //标识当前线程所有者 //一个双向链表作为一个队列来使用,存放等待队列(保存待加锁的线程)。 //非公平锁:新来一个线程直接判断能不能加锁,若能加锁则直接加锁,不能加锁才放入等待队列 //公平锁:等待队列中的线程依次加锁,体现的“公平”的特点 ReentratLock 多个线程争抢锁 尝试加锁,public boolean tryLock(long timeout, TimeUnit unit) 如果加锁不成功,则等待一段时间不成功则设置当前线程waitstate为SIGNAL状态,并持续重试,若最终超过了等待时间则从队列中移除 ReentrantReadWriteLock读写锁 使用state的高低16位(因为int是32位)来标识读锁(高16位)和写锁(低16位),非0表示已经加过锁了 读锁,可以同时被多个线程同时持有。读请求可以并发起来 写锁,则是互斥的 Condition 可重入锁可以创建Condition,主要有2个方法await与notify/notifyAll。 await等待唤醒,await与notify/notifyAll唤醒对应conditionawait的线程。 Condition.await()原理:将自己加入condition等待队列、释放锁、挂起自己 如果在加锁等待队列里有人阻塞,会有unpark的过程,唤醒加锁等待队列中的队头元素的那个过程 signal唤醒的过程,大概的意思,就是把condition等待队列中的元素,转化为一个加锁等待队列中的元素……

阅读全文

并发源码|Atomicxxx原理

AtomicInteger原子类底层的一些原理 AtomicLong、AtomicBoolean、AtomicReference、LongAdder等 都是无锁化或者叫乐观锁,判断此时此刻是否是某个值,如果是则修改,不是则重新查询一个最新的值,再次执行判断。这个操作叫CAS,compare and set(swap jdk8及以后) Atomic原子类底层核心的原理就是CAS,无锁化,乐观锁,每次尝试修改的时候,就对比一下,有没有人修改过这个值,没有人修改,自己就修改,如果有人修改过,就重新查出来最新的值,再次重复那个过程 源码: (1)变量的offset,volatile value 使其值对其他线程可见 private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; (2)Unsafe:核心类,负责执行CAS操作 public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; } (3)API接口:Atomic原子类的各种使用方式 最底层使用c代码,发送cpu指令,确保CAS操作绝对原子(通过指令来锁掉某一小块内存,并且保证只有一个线程在同一时间可以对此块内存数据做CAS操作)。……

阅读全文

并发源码|Hashmap死循环问题与ConcurrentHashmap

jdk7hashmap死循环问题 在多线程并发环境下。不应该用此容器,而应使用线程安全的容器如ConcurrentHashmap 其死循环主要是扩容的时候resize方法内调用的transfer方法导致。 e.next = newTable[i]; newTable[i] = e; 如果两个线程正在处理同一个节点e,那么第一个线程正常执行,但是第二个线程设置e.next=e,因为第一个线程已经将newTable[i]设置为e。节点e现在指向它自己,当调用get(Object)时,它将进入一个无限循环。无限循环则会导致cpu100%,另外因为节点形成了环形,后面的元素无法访问到导致数据丢失 而java8中,resize的时候保持了节点的顺序。但同样会有数据丢失的问题 void transfer(Entry[] newTable) { Entry[] src = table; int newCapacity = newTable.length; for (int j = 0; j < src.length; j++) { Entry<K,V> e = src[j]; if (e != null) { src[j] = null; do { Entry<K,V> next = e.next; int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } while (e != null); } } } jdk7与jdk8关于Hashmap的主要区别……

阅读全文

并发源码|synchronized、wait与notify

synchronized的底层原理 跟jvm及monitor有关。 如果用到了synchronized关键字,在底层编译后的jvm指令中,会有monitorenter和monitorexit两个指令 每个对象都是有个关联的monitor对象,即对象的Mark Word。里面有个计数器i,一个线程获取到锁以后i++,并且将锁对象设置为当前线程。释放锁时候i–,若i为0 表示完全释放锁,将monitor的锁对象设为null 这个monitor的锁是支持重入加锁的,什么意思呢,好比下面的代码片段 synchronized(myObject) { // 一大堆的代码 synchronized(myObject) { // 一大堆的代码 } } 如果一个线程第一次synchronized那里,获取到了myObject对象的monitor的锁,计数器加1,然后第二次synchronized那里,会再次获取myObject对象的monitor的锁,这个就是重入加锁了,然后计数器会再次加1,变成2 这个时候,其他的线程在第一次synchronized那里,会发现说myObject对象的monitor锁的计数器是大于0的,意味着被别人加锁了,然后此时线程就会进入block阻塞状态,什么都干不了,就是等着获取锁 接着如果出了synchronized修饰的代码片段的范围,就会有一个monitorexit的指令,在底层。此时获取锁的线程就会对那个对象的monitor的计数器减1,如果有多次重入加锁就会对应多次减1,直到最后,计数器是0 然后后面block住阻塞的线程,会再次尝试获取锁,但是只有一个线程可以获取到锁 block以后,将block的线程加入wait set等待其他线程完成执行 wait与sleep的区别 前者释放锁,后者不释放锁 wait(),必须是有人notify唤醒他 wait(timeout),阻塞一段时间,然后自己唤醒,继续争抢锁 wait与notify,必须在synchronized代码块中使用,因为必须是拥有monitor lock的线程才可以执行wait与notify操作 因此wait与notify,必须与synchornized一起,对同一个对象进行使用,这样他们对应的monitor才是一样的 notify()与notifyall():前者就唤醒block状态的一个线程,后者唤醒block状态的所有线程……

阅读全文

并发源码|其他并发容器

CopyOnWriteArrayList 写时复制的并发Arraylist容器,底层使用锁来实现 底层使用volatile标记array,保证并发修改后能立即可见。 另外使用ReentrantLock在进行写操作时加锁 /** The array, accessed only via getArray/setArray. */ private transient volatile Object[] array; /** The lock protecting all mutators */ final transient ReentrantLock lock = new ReentrantLock(); 所有的读操作(get,迭代器,contains)都是读的array快照,弱一致性模型 优点:线程安全,读写不互斥 缺点:若一致性,每次修改都要新建一个数组。空间浪费大,增加gc压力 ConcurrentLinkedQueue 无界队列,底层使用单向链表实现,无大小限制。所有的指针操作都是基于CAS,没成功则进入下次循环直到成功。 不停的往此队列插入数据,可能会导致内存溢出。 此容器使用CAS保证并发安全,但size是弱一致性的 LinkedBlockingQueue 有界队列,底层使用单向链表实现,有大小限制,超过了限制,往队列里插入数据就会阻塞住。可以限制内存队列的大小,避免内存无限制的增长,最后撑爆内存 底层使用2个锁提高并发能力:take锁与put锁,以及各自的Condition。如下图所示 take的时候,若为空则阻塞等待,容器非空时再take数据 put的时候,若满则阻塞等待,容器不满再put数据 /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.……

阅读全文

并发源码|并发常用组件

CountDownLatch:同步等待多个线程完成任务的并发组件 CyclicBarrier:将工作任务给多线程分而治之的并发组件,此组件比CountDownLatch可重复使用 案例实战:api服务中对多个接口并发调用后统一合并返回数据,可使用CountDownLatch或者CyclicBarrier并行请求多个接口,全部完成后再统一返回给调用方。用此方法可极大降低请求时间。 Semaphore:等待指定数量的线程完成任务的并发组件。此组件设置一个指定的数量i,可以有>=i个线程来并发执行。一旦完成了i个任务后,就可以执行接下来的方法了。……

阅读全文

并发源码|线程池原理

线程池 线程池的意义:频繁的创建和销毁线程,性能开销比较大。线程池创建一些线程,执行完任务后不立即销毁,可以等待去执行下一个任务 线程池相关参数: /** * 用给定的初始参数创建一个新的ThreadPoolExecutor。 */ public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量 int maximumPoolSize,//线程池的最大线程数 long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间 TimeUnit unit,//时间单位 BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列 ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可 RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务 ) 线程池支持5种,Executors静态方法创建: FixedThreadPool:固定数量的线程,其他线程放入无界等待队列 CachedThreadPool:线程数量不固定,无论多少任务都会不停的创建线程。线程空闲一定时间,释放线程 SingleThread:线程池里只有一个线程,其他线程放入无界等待队列 ScheduledThread:提交的线程,会在等待的时间过后才会去执行 WorkStealingPool:底层使用forkjoin来执行 // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); private static int workerCountOf(int c) { return c & CAPACITY; } private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 如果任务为null,则抛出异常。 if (command == null) throw new NullPointerException(); // ctl 中保存的线程池当前的一些状态信息 int c = ctl.……

阅读全文