并发源码|线程池原理

线程池

线程池的意义:频繁的创建和销毁线程,性能开销比较大。线程池创建一些线程,执行完任务后不立即销毁,可以等待去执行下一个任务

线程池相关参数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    /**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               )

线程池支持5种,Executors静态方法创建:

  1. FixedThreadPool:固定数量的线程,其他线程放入无界等待队列
  2. CachedThreadPool:线程数量不固定,无论多少任务都会不停的创建线程。线程空闲一定时间,释放线程
  3. SingleThread:线程池里只有一个线程,其他线程放入无界等待队列
  4. ScheduledThread:提交的线程,会在等待的时间过后才会去执行
  5. WorkStealingPool:底层使用forkjoin来执行
  6. newVirtualThreadPerTaskExecutor:jdk21新增,底层使用虚拟线程。不推荐使用,因为虚拟线程比较轻量不推荐池化。仅适用于项目虚拟线程改造,将原来线程池做简单替换。代码改动较少
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
   // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }

    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();

        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

工作队列

存放任务的工作队列有6种主要的实现,分别是 ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeque、PriorityBlockingQueue、DelayQueue、SynchronousQueue。它们的区别如下:

  1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
  2. LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为 Integer.MAX_VALUE
  3. LinkedBlockingDeque:使用双向队列实现的双端阻塞队列,双端意味着可以像普通队列一样 FIFO(先进先出),可以以像栈一样 FILO(先进后出)
  4. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现 Comparable 接口也可以提供 Comparator 来对队列中的元素进行比较,跟时间没有任何关系,仅仅是按照优先级取任务。
  5. DelayQueue:同 PriorityBlockingQueue,也是二叉堆实现的优先级阻塞队列。要求元素都实现 Delayed 接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
  6. SynchronousQueue:一个不存储元素的阻塞队列,消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

拒绝策略

内置的有4种拒绝策略

  1. AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
  2. CallerRunsPolicy:由调用线程处理该任务。(例如io操作,线程消费速度没有NIO快,可能导致阻塞队列一直增加,此时可以使用这个模式)。
  3. DiscardPolicy:丢弃任务,但是不抛出异常。(可以配合这种模式进行自定义的处理方式)。
  4. DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。

当然也可以根据需求自定义拒绝策略,实现RejectedExecutionHandler接口即可

如何关闭线程池

shutdown:阻止新的任务提交,将线程池的状态改为shutdown,当再提交任务时,如果状态不为running,则执行拒绝策略。对于已提交的任务不会产生任何影响,如果还有任务未执行,线程将继续把任务执行完

shutdownNow:会关闭正在执行任务的线程,任务可能并没有执行完毕,关闭线程不需要等待

线程池核心线程数经验配置

CPU密集型任务:尽量压榨CPU,参考值设置为CPU的个数+1。

IO密集型任务:参考值可以设置为CPU的个数 ✖️ 2。

虚拟线程(jdk21新增):对于IO密集型任务,也可以改为使用虚拟线程。但需要代码中不使用synchronized关键字

线程池的好处

  1. 线程重用:线程的创建和销毁开销是巨大的,而通过线程池的重用大大减少了这些不必要的开销,当然既然少了这么多开销,其线程执行速度也是突飞猛进的提升。
  2. 控制线程池的并发数:线程不是并发的越多,性能越高,反而在线程并发太多时,线程的切换会消耗系统大量的资源,可以通过设置线程池最大并发线程数目,维持系统高性能。
  3. 线程池可以对线程进行管理:虽然线程提供了线程组操控线程,但是线程池拥有更多管理线程的API。
  4. 可以储存需要执行的任务:当任务提交过多时,可以将任务储存起来,等待线程处理。
Talk is cheap, show me the bug/code.
使用 Hugo 构建
主题 StackJimmy 设计