线程池

image-20220305111316950
  1. Executor:执行者,用来执行一个Runnable

    image-20220304203507319
  2. ExecutorService:拓展了Executor,完善了整个任务执行器的所有声明周期方法,这里遵循了接口隔离原则。

    image-20220304203539111
  3. AbstractExecutorService主要实现了submit、invokeAny、invokeAll方法,这里使用了模板设计模式,在这些方法中实现了模板代码逻辑,并暴露出一些抽象方法供子类实现

  4. ThreadPoolExecutor 则是具体的线程池的实现

  5. FoorkJoinPool,不同于ThreadPoolExecutor,它具有分解汇总任务,使用很少的线程可以执行很多的任务,非常适合CPU密集型的场景

ThreadPoolExecutor

image-20220305112306756

使用ThreadPoolExecutor

// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
// 提交任务
executor.execute(() -> System.out.println(1));
executor.execute(() -> System.out.println(2));
// 关闭线程池
executor.shutdown();

构造器

创建一个ThreadPool供需要提供如下几个参数:

image-20220305112525488
  1. corePoolSize 核心线程数,线程池默认的核心线程数,几十超过keepAlive也不会释放线程,他将会一直占用

  2. maximumPoolSize 最大线程数,最多可以创建的线程数

  3. keepAliveTime 如果一个线程持续这个时间,就将这个线程归还给操作系统,因为操作系统的线程数量有限,注意归还的是非核心线程

  4. unitkeepAliveTime的时间单位

  5. workQueue 指定任务队列,需要时一个阻塞队列,当来了一个任务时,没有空闲的线程可以处理,就将其放入到任务队列中

  6. threadFactory 用于产生线程的线程工厂,可以为线程指定名称,一些额外的日志操作

  7. handler 拒绝策略,如果任务队列也满了,就会进行拒绝策略(比如对未能处理的进行持久化)

拒绝策略

拒绝策略采用策略模式,其策略接口为RejectedExecutionHandler,共有以下几个实现类:

  1. AbortPolicy 直接抛出异常拒绝

  2. CallerRunsPolicy 调用run方法直接在主线程执行,这种方式也称为背压

  3. DiscardPolicy 内部什么都不做,新的线程将会被忽略 (不可使用)

  4. DiscardOldestPolicy 会poll掉workQueue的一个最老的任务,然后将当前的任务放入

线程池的状态

线程池的状态由ctl这个数字二进制的前三位存储(后面29位信息用来记录工作线程的数量),他的状态共分为五种,如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; // 接收新任务和工作队列任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 不接收新任务,但是接收工作队列任务
private static final int STOP       =  1 << COUNT_BITS; // 不接受新任务以及工作队列任务,且打断工作中的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 终止所有任务,待处理任务数量为0,线程转换TIDYING,将会执行terminal钩子函数
private static final int TERMINATED =  3 << COUNT_BITS; // terminal函数执行完毕将会自动变为这个状态

状态机的转换:

面试重点

  1. ctl 是什么

  2. 状态转换的过程

  3. 构造函数

  4. 拒绝策略

  5. runWorker方法的三个步骤

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 相对于 ThreadPoolExecutor 扩展了 ScheduledExecutorService 接口:

package java.util.concurrent;

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);


    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

ForkJoinPool

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

/**
 * T12_ForkJoinPool  分而治之
 * Fork: 分叉
 * Join: 合并
 * 
 * 将一个任务拆分多个任务执行(可以无限切分),然后将结果合并
 * 
 * 比如大量的并行计算, 如下: 求100_0000个数字之和, 使用多线程
 */
public class T12_ForkJoinPool {

    static int[] nums = new int[100_0000];
    static final int MAX_NUM = 5_0000; // 每个线程最多可以运行5万个数字相加
    static Random random = new Random();

    // 初始化这100_000个数字, 每个数字范围在100之内
    static {

        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt(100);
        }
        // 所有数字和, 事先计算:
        //System.out.println(Arrays.stream(nums).sum()); // 使用单线程stream api 进行求和
    }

    /**
     * RecursiveAction: 递归操作 没有返回值
     * RecursiveTask: 递归操作,有返回值
     */
    static class AddTask extends RecursiveAction {

        int start, end;

        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {

            // 进行计算
            // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork 
            if (end - start <= MAX_NUM) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                System.out.println("sum = " + sum);
            } else {
                int middle = (end - start) / 2;
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }

    static class AddTask2 extends RecursiveTask<Long> {

        int start, end;

        AddTask2(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            // 进行计算
            // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork 
            if (end - start <= MAX_NUM) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                return sum;
            } else {
                int middle = start + (end - start) / 2; // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
                AddTask2 subTask1 = new AddTask2(start, middle);
                AddTask2 subTask2 = new AddTask2(middle, end);
                subTask1.fork();
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }
    }

    // 运行
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask2 task = new AddTask2(0, nums.length);
        fjp.execute(task);
        System.out.println(task.join());

        //System.in.read();
    }

}

Executors

线程池的工厂/工具类,他有以下几个常用的方法:

  1. SingleThreadExecutor

    // 为什么要有单线程的线程池?
    // 1. 线程池可以保证任务同时只运行一个
    // 2. 线程池内部有一个BlockQueue任务队列,使用的队列最大可以存储 Integer 最大值的任务
    public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
    }
  2. CachedThreadPool,一般不会使用

    // 每来一个线程就会创建一个线程
    // 如果存在空闲,则会使用空闲的线程
    public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  // 没有核心线程,但是可以同时运行很多的线程
                                    60L, TimeUnit.SECONDS, // 60 秒回收
                                    new SynchronousQueue<Runnable>());  // 队列使用同步阻塞队列
  3. FixedThreadPool,固定长度的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  4. ScheduledThreadPool,专为定时任务提供的线程池,使用的不多,应该使用定时任务框架(Quarts)

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
      super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue()); // 隔多久之后可以执行
    }

如何调整线程池的大小

  1. 如果线程数量设置的过多,那么他们会竞争稀缺的处理器和内存资源,造成大量的时间浪费在线程的切换上

  2. 如果线程数量设置的过少,那么处理器的一些核就无法被充分利用

  3. 线程池的短小可以使用下面的公式计算:

    image-20220305120916111
  4. 推荐使用压测

最后更新于