# 线程池

![image-20220305111316950](/files/b0xFBvu3zB33fXyRT6Op)

1. Executor：执行者，用来执行一个Runnable

   ![image-20220304203507319](/files/LJLrZeZLvA6QlrLHQThy)
2. ExecutorService：拓展了Executor，完善了整个任务执行器的所有声明周期方法，这里遵循了接口隔离原则。

   ![image-20220304203539111](/files/sOspxKbcf9GR3pXwlGhn)
3. AbstractExecutorService主要实现了submit、invokeAny、invokeAll方法，这里使用了模板设计模式，在这些方法中实现了模板代码逻辑，并暴露出一些抽象方法供子类实现
4. ThreadPoolExecutor 则是具体的线程池的实现
5. FoorkJoinPool，不同于ThreadPoolExecutor，它具有**分解汇总任务**，使用很少的线程可以执行很多的任务，非常适合CPU密集型的场景

## ThreadPoolExecutor

![image-20220305112306756](/files/JOeOAcZOhYfe7nYVypvI)

### 使用ThreadPoolExecutor

```java
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
// 提交任务
executor.execute(() -> System.out.println(1));
executor.execute(() -> System.out.println(2));
// 关闭线程池
executor.shutdown();
```

### 构造器

创建一个ThreadPool供需要提供如下几个参数：

![image-20220305112525488](/files/lGwTzzuU3Mcd0jY3840P)

1. `corePoolSize` 核心线程数，线程池默认的核心线程数，几十超过`keepAlive`也不会释放线程，他将会一直占用
2. `maximumPoolSize` 最大线程数，最多可以创建的线程数
3. `keepAliveTime` 如果一个线程持续这个时间，就将这个线程归还给操作系统，因为操作系统的线程数量有限，注意归还的是非核心线程
4. `unit` 是 `keepAliveTime`的时间单位
5. `workQueue` 指定任务队列，需要时一个阻塞队列，当来了一个任务时，没有空闲的线程可以处理，就将其放入到任务队列中
6. `threadFactory` 用于产生线程的线程工厂，可以为线程指定名称，一些额外的日志操作
7. `handler` 拒绝策略，如果任务队列也满了，就会进行拒绝策略（比如对未能处理的进行持久化）

### 拒绝策略

拒绝策略采用策略模式，其策略接口为`RejectedExecutionHandler`，共有以下几个实现类：

1. `AbortPolicy` 直接抛出异常拒绝
2. `CallerRunsPolicy` 调用run方法直接在主线程执行，这种方式也称为**背压**
3. `DiscardPolicy` 内部什么都不做，新的线程将会被忽略 (不可使用)
4. `DiscardOldestPolicy` 会poll掉workQueue的一个最老的任务，然后将当前的任务放入

### 线程池的状态

线程池的状态由ctl这个数字二进制的前三位存储（后面29位信息用来记录工作线程的数量），他的状态共分为五种，如下：

```java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS; // 接收新任务和工作队列任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 不接收新任务，但是接收工作队列任务
private static final int STOP       =  1 << COUNT_BITS; // 不接受新任务以及工作队列任务，且打断工作中的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 终止所有任务，待处理任务数量为0，线程转换TIDYING，将会执行terminal钩子函数
private static final int TERMINATED =  3 << COUNT_BITS; // terminal函数执行完毕将会自动变为这个状态
```

状态机的转换：

![](/files/eoIk2gR9HhBJ8ghbZ7Gs)

### 面试重点

1. ctl 是什么
2. 状态转换的过程
3. 构造函数
4. 拒绝策略
5. runWorker方法的三个步骤

### ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 相对于 ThreadPoolExecutor 扩展了 ScheduledExecutorService 接口：

```java
package java.util.concurrent;

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);


    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

```

## ForkJoinPool

```java
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

/**
 * T12_ForkJoinPool  分而治之
 * Fork: 分叉
 * Join: 合并
 * 
 * 将一个任务拆分多个任务执行(可以无限切分),然后将结果合并
 * 
 * 比如大量的并行计算, 如下: 求100_0000个数字之和, 使用多线程
 */
public class T12_ForkJoinPool {

    static int[] nums = new int[100_0000];
    static final int MAX_NUM = 5_0000; // 每个线程最多可以运行5万个数字相加
    static Random random = new Random();

    // 初始化这100_000个数字, 每个数字范围在100之内
    static {

        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt(100);
        }
        // 所有数字和, 事先计算:
        //System.out.println(Arrays.stream(nums).sum()); // 使用单线程stream api 进行求和
    }

    /**
     * RecursiveAction: 递归操作 没有返回值
     * RecursiveTask: 递归操作,有返回值
     */
    static class AddTask extends RecursiveAction {

        int start, end;

        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {

            // 进行计算
            // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork 
            if (end - start <= MAX_NUM) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                System.out.println("sum = " + sum);
            } else {
                int middle = (end - start) / 2;
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }

    static class AddTask2 extends RecursiveTask<Long> {

        int start, end;

        AddTask2(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            // 进行计算
            // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork 
            if (end - start <= MAX_NUM) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                return sum;
            } else {
                int middle = start + (end - start) / 2; // 注意这里，如果有问题，会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
                AddTask2 subTask1 = new AddTask2(start, middle);
                AddTask2 subTask2 = new AddTask2(middle, end);
                subTask1.fork();
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }
    }

    // 运行
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask2 task = new AddTask2(0, nums.length);
        fjp.execute(task);
        System.out.println(task.join());

        //System.in.read();
    }

}
```

## Executors

线程池的工厂/工具类，他有以下几个常用的方法：

1. SingleThreadExecutor

   ```java
   // 为什么要有单线程的线程池?
   // 1. 线程池可以保证任务同时只运行一个
   // 2. 线程池内部有一个BlockQueue任务队列，使用的队列最大可以存储 Integer 最大值的任务
   public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
       (new ThreadPoolExecutor(1, 1,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>()));
   }
   ```
2. CachedThreadPool，一般不会使用

   ```java
   // 每来一个线程就会创建一个线程
   // 如果存在空闲，则会使用空闲的线程
   public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  // 没有核心线程，但是可以同时运行很多的线程
                                   60L, TimeUnit.SECONDS, // 60 秒回收
                                   new SynchronousQueue<Runnable>());  // 队列使用同步阻塞队列
   ```
3. FixedThreadPool，固定长度的线程池

   ```java
   public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }
   ```
4. ScheduledThreadPool，专为定时任务提供的线程池，使用的不多，应该使用定时任务框架（Quarts）

   ```java
   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
       return new ScheduledThreadPoolExecutor(corePoolSize);
   }

   public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
           new DelayedWorkQueue()); // 隔多久之后可以执行
   }
   ```

## 如何调整线程池的大小

1. 如果线程数量设置的过多，那么他们会竞争稀缺的处理器和内存资源，造成大量的时间浪费在线程的切换上
2. 如果线程数量设置的过少，那么处理器的一些核就无法被充分利用
3. 线程池的短小可以使用下面的公式计算：

   ![image-20220305120916111](/files/W7Of5yp9zRAp7qeubNQl)
4. 推荐使用压测


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://yangsx95.gitbook.io/notes/programming-language/java/bing-fa-bian-cheng/xian-cheng-chi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
