线程池

Executor:执行者,用来执行一个Runnable
image-20220304203507319 ExecutorService:拓展了Executor,完善了整个任务执行器的所有声明周期方法,这里遵循了接口隔离原则。
image-20220304203539111 AbstractExecutorService主要实现了submit、invokeAny、invokeAll方法,这里使用了模板设计模式,在这些方法中实现了模板代码逻辑,并暴露出一些抽象方法供子类实现
ThreadPoolExecutor 则是具体的线程池的实现
FoorkJoinPool,不同于ThreadPoolExecutor,它具有分解汇总任务,使用很少的线程可以执行很多的任务,非常适合CPU密集型的场景
ThreadPoolExecutor

使用ThreadPoolExecutor
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
// 提交任务
executor.execute(() -> System.out.println(1));
executor.execute(() -> System.out.println(2));
// 关闭线程池
executor.shutdown();
构造器
创建一个ThreadPool供需要提供如下几个参数:

corePoolSize
核心线程数,线程池默认的核心线程数,几十超过keepAlive
也不会释放线程,他将会一直占用maximumPoolSize
最大线程数,最多可以创建的线程数keepAliveTime
如果一个线程持续这个时间,就将这个线程归还给操作系统,因为操作系统的线程数量有限,注意归还的是非核心线程unit
是keepAliveTime
的时间单位workQueue
指定任务队列,需要时一个阻塞队列,当来了一个任务时,没有空闲的线程可以处理,就将其放入到任务队列中threadFactory
用于产生线程的线程工厂,可以为线程指定名称,一些额外的日志操作handler
拒绝策略,如果任务队列也满了,就会进行拒绝策略(比如对未能处理的进行持久化)
拒绝策略
拒绝策略采用策略模式,其策略接口为RejectedExecutionHandler
,共有以下几个实现类:
AbortPolicy
直接抛出异常拒绝CallerRunsPolicy
调用run方法直接在主线程执行,这种方式也称为背压DiscardPolicy
内部什么都不做,新的线程将会被忽略 (不可使用)DiscardOldestPolicy
会poll掉workQueue的一个最老的任务,然后将当前的任务放入
线程池的状态
线程池的状态由ctl这个数字二进制的前三位存储(后面29位信息用来记录工作线程的数量),他的状态共分为五种,如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 接收新任务和工作队列任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,但是接收工作队列任务
private static final int STOP = 1 << COUNT_BITS; // 不接受新任务以及工作队列任务,且打断工作中的任务
private static final int TIDYING = 2 << COUNT_BITS; // 终止所有任务,待处理任务数量为0,线程转换TIDYING,将会执行terminal钩子函数
private static final int TERMINATED = 3 << COUNT_BITS; // terminal函数执行完毕将会自动变为这个状态
状态机的转换:

面试重点
ctl 是什么
状态转换的过程
构造函数
拒绝策略
runWorker方法的三个步骤
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 相对于 ThreadPoolExecutor 扩展了 ScheduledExecutorService 接口:
package java.util.concurrent;
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ForkJoinPool
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
/**
* T12_ForkJoinPool 分而治之
* Fork: 分叉
* Join: 合并
*
* 将一个任务拆分多个任务执行(可以无限切分),然后将结果合并
*
* 比如大量的并行计算, 如下: 求100_0000个数字之和, 使用多线程
*/
public class T12_ForkJoinPool {
static int[] nums = new int[100_0000];
static final int MAX_NUM = 5_0000; // 每个线程最多可以运行5万个数字相加
static Random random = new Random();
// 初始化这100_000个数字, 每个数字范围在100之内
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = random.nextInt(100);
}
// 所有数字和, 事先计算:
//System.out.println(Arrays.stream(nums).sum()); // 使用单线程stream api 进行求和
}
/**
* RecursiveAction: 递归操作 没有返回值
* RecursiveTask: 递归操作,有返回值
*/
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 进行计算
// 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork
if (end - start <= MAX_NUM) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += nums[i];
}
System.out.println("sum = " + sum);
} else {
int middle = (end - start) / 2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
static class AddTask2 extends RecursiveTask<Long> {
int start, end;
AddTask2(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 进行计算
// 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork
if (end - start <= MAX_NUM) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
} else {
int middle = start + (end - start) / 2; // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
AddTask2 subTask1 = new AddTask2(start, middle);
AddTask2 subTask2 = new AddTask2(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
// 运行
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
AddTask2 task = new AddTask2(0, nums.length);
fjp.execute(task);
System.out.println(task.join());
//System.in.read();
}
}
Executors
线程池的工厂/工具类,他有以下几个常用的方法:
SingleThreadExecutor
// 为什么要有单线程的线程池? // 1. 线程池可以保证任务同时只运行一个 // 2. 线程池内部有一个BlockQueue任务队列,使用的队列最大可以存储 Integer 最大值的任务 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
CachedThreadPool,一般不会使用
// 每来一个线程就会创建一个线程 // 如果存在空闲,则会使用空闲的线程 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 没有核心线程,但是可以同时运行很多的线程 60L, TimeUnit.SECONDS, // 60 秒回收 new SynchronousQueue<Runnable>()); // 队列使用同步阻塞队列
FixedThreadPool,固定长度的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ScheduledThreadPool,专为定时任务提供的线程池,使用的不多,应该使用定时任务框架(Quarts)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); // 隔多久之后可以执行 }
如何调整线程池的大小
如果线程数量设置的过多,那么他们会竞争稀缺的处理器和内存资源,造成大量的时间浪费在线程的切换上
如果线程数量设置的过少,那么处理器的一些核就无法被充分利用
线程池的短小可以使用下面的公式计算:
image-20220305120916111 推荐使用压测
最后更新于
这有帮助吗?