private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 接收新任务和工作队列任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,但是接收工作队列任务
private static final int STOP = 1 << COUNT_BITS; // 不接受新任务以及工作队列任务,且打断工作中的任务
private static final int TIDYING = 2 << COUNT_BITS; // 终止所有任务,待处理任务数量为0,线程转换TIDYING,将会执行terminal钩子函数
private static final int TERMINATED = 3 << COUNT_BITS; // terminal函数执行完毕将会自动变为这个状态
package java.util.concurrent;
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ForkJoinPool
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
/**
* T12_ForkJoinPool 分而治之
* Fork: 分叉
* Join: 合并
*
* 将一个任务拆分多个任务执行(可以无限切分),然后将结果合并
*
* 比如大量的并行计算, 如下: 求100_0000个数字之和, 使用多线程
*/
public class T12_ForkJoinPool {
static int[] nums = new int[100_0000];
static final int MAX_NUM = 5_0000; // 每个线程最多可以运行5万个数字相加
static Random random = new Random();
// 初始化这100_000个数字, 每个数字范围在100之内
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = random.nextInt(100);
}
// 所有数字和, 事先计算:
//System.out.println(Arrays.stream(nums).sum()); // 使用单线程stream api 进行求和
}
/**
* RecursiveAction: 递归操作 没有返回值
* RecursiveTask: 递归操作,有返回值
*/
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 进行计算
// 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork
if (end - start <= MAX_NUM) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += nums[i];
}
System.out.println("sum = " + sum);
} else {
int middle = (end - start) / 2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
static class AddTask2 extends RecursiveTask<Long> {
int start, end;
AddTask2(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 进行计算
// 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork
if (end - start <= MAX_NUM) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
} else {
int middle = start + (end - start) / 2; // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
AddTask2 subTask1 = new AddTask2(start, middle);
AddTask2 subTask2 = new AddTask2(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
// 运行
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
AddTask2 task = new AddTask2(0, nums.length);
fjp.execute(task);
System.out.println(task.join());
//System.in.read();
}
}
Executors
线程池的工厂/工具类,他有以下几个常用的方法:
SingleThreadExecutor
// 为什么要有单线程的线程池?
// 1. 线程池可以保证任务同时只运行一个
// 2. 线程池内部有一个BlockQueue任务队列,使用的队列最大可以存储 Integer 最大值的任务
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
CachedThreadPool,一般不会使用
// 每来一个线程就会创建一个线程
// 如果存在空闲,则会使用空闲的线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 没有核心线程,但是可以同时运行很多的线程
60L, TimeUnit.SECONDS, // 60 秒回收
new SynchronousQueue<Runnable>()); // 队列使用同步阻塞队列
FixedThreadPool,固定长度的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}