锁机制
最后更新于
这有帮助吗?
最后更新于
这有帮助吗?
指令由cpu执行,任何语言程序的锁都是由指令cmpxchg
,全称 compare and exchange
:
cmpxchg dest,src
cpu 要将count数值更改,他会先从内存中加载此值
cpu执行cmpxchg,将会:
比较 当前持有的值与内存中的值是否相同
如果相同代表数值没有被修改,将会替换值,并返回成功
如果不相同则代表数值有可能被修改,并返回失败
此外,CPU为了满足cmpxchg指令的原子性,为其增加了lock:
lock cmpxchg
所有的锁都基于cmpxchg
指令(x86),在Java中,有一个native方法映射了该指令:
// sun.misc.Unsafe
/**
* 如果达到期望值,就会使用替换值替换
*
* @param o 和 offset 用于表示目标变量的内存地址
* @param expected 期望值
* @param x 替换值
*/
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
如何表示锁的状态
boolean state
,只能表示两种状态
int state
不仅可以表示状态,还可以记录锁被持有的数量,这样可以提供对重入锁的支持
保证多线程抢锁线程安全
基于 lock cmpxchg
,对应java Unsafe.compareAndSwapInt
方法,也就是CAS
处理没有获取到锁的线程
自旋等待:
缺点,后台线程自旋等待,会占用CPU,导致性能障碍
优点,适用于并发低,任务耗时短的操作,因为处理快,每个线程自旋一会儿就会获取到锁
注意,当并发数逐渐增大或者任务耗时较长时,也就是没有获取锁的等待线程数量增大,自旋的线程数量以及时间会增长,会导致cpu自旋的压力增大
阻塞等待
交给操作系统将其阻塞
当自旋锁消耗的时间大于阻塞上下文切换的时间时,该使用阻塞
自旋 + 阻塞
释放锁后如何将锁重新分配
自旋锁:每个线程在不停的循环,他们会自己抢锁
阻塞锁:唤醒线程
全称AbstractQueueSynchronizer
:
Abstract表示该类并不知道该如何上锁,使用了模板设计模式,暴露钩子函数给子类
Queue表示线程阻塞队列,抢不到锁的线程都会处于这个队列中排队
Synchronizer翻译为同步器,因为他可以保证线程安全
AQS底层实际上是使用 CAS + state完成多线程抢锁的逻辑
总结:子类只需要实现上锁、释放锁的逻辑,至于排队阻塞等待、唤醒机制均由AQS实现
核心代码:
上锁:
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 子类获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果锁获取失败,就将其添加到阻塞队列中
selfInterrupt();
}
// 子类实现获取锁的逻辑,AQS并不关心
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
释放锁:
@ReservedStackAccess
public final boolean release(int arg) {
if (tryRelease(arg)) { // 释放锁成功
// 检查阻塞队列唤醒线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 子类实现释放锁的逻辑,AQS并不关心
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
ReentrantLock 是基于AQS实现的可重入锁的实现类,他有公平锁、非公平锁两种实现:
public class ReentrantLock implements Lock, java.io.Serializable {
// 同步器
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {...}
// 非公平锁
static final class NonfairSync extends Sync{...}
// 公平锁
static final class FairSync extends Sync{...}
public ReentrantLock() { // 默认是非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}
线程D进入后,检查阻塞队列,如果阻塞队列有阻塞线程,线程D就在后面排队
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 子类获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果锁获取失败,就将其添加到阻塞队列中
selfInterrupt();
}
}
}
// Sync 暴露一个线程加锁时,抢锁的逻辑,以此区分公平锁和非公平锁
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract boolean initialTryLock();
// 上锁
@ReservedStackAccess
final void lock() {
if (!initialTryLock()) // 如果获取锁没有获取到
acquire(1); // 抢锁失败后,调用AQS标准的获取锁的流程, 会调用子类 tryAcqure()方法
}
}
static final class FairSync extends Sync {
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// AQS 同步器状态
int c = getState();
// 如果 == 0 代表锁处于释放状态
if (c == 0) {
// 如果前面没有等待的线程,并且去抢锁抢到了
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
// 设置当前线程持有锁
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程已经持有锁(可重入特性)
else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 // 锁状态为0代表锁未被占有
&& !hasQueuedPredecessors() // 阻塞队列中没有线程
&& compareAndSetState(0, acquires)) { // 获取锁成功
setExclusiveOwnerThread(Thread.currentThread()); // 设置当前线程占有锁
return true;
}
return false;
}
}
线程D进入后,直接抢锁
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract boolean initialTryLock();
@ReservedStackAccess
final void lock() {
if (!initialTryLock()) // 如果获取锁没有获取到
acquire(1); // 抢锁失败后,调用AQS标准的获取锁的流程
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 非公平锁下,无论阻塞队列中是否有其他线程排队,当前线程直接抢锁
// 没有线程切换,以及线程状态判断
if (compareAndSetState(0, 1)) { // 0 -> 1 加锁
// 如果上锁成功就标识当前线程为获取锁的线程
setExclusiveOwnerThread(current);
return true;
}
// 上锁失败,判断当前持有锁的线程是否时当前线程(可重入的特性)
else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow 可重入次数溢出int最大值,变为复数
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
// 没获取到锁
else
return false;
}
/**
* 与公平锁相比,无论队列中是否有等待线程,都尝试获取
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 // 如果当前锁未被占有
&& compareAndSetState(0, acquires)) { // 获取锁
setExclusiveOwnerThread(Thread.currentThread()); // 设置当前
return true;
}
return false;
}
}
所以公平锁的效率是不如非公平锁的,这是因为公平锁需要线程上下文切换时间以及调度的延迟时间,而非公平锁在线程D进入时,先抢锁,不存在上述两个时间。
// 释放锁的操作都是调用的AQS的release方法
// 此方法暴露了一个 tryRelease 钩子函数,由子类重写指定
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
// Sync
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 减去release 但是并没有设置状态,所以其他线程无法进入
int c = getState() - releases;
// 锁定持有线程不是当前线程,或者锁压根没有上锁,抛出异常
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
// 因为是可重入,只有当c == 0 时才代表全部释放,再设置锁的站有现成为null
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
// 更新state的状态
setState(c);
return free;
}
基于ReentrantLock的读写锁,拆分为两把锁,读锁和写锁。适用于读多写少的场景。读读不互斥,读写互斥,写写互斥。
public class ThreadDemo {
static volatile int a;
public static void readA() {
System.out.println(a);
}
public static void writeA() {
a++;
}
public static void main(String[] args) {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
Thread readThread1 = new Thread(() -> {
readLock.lock();
try {
readA();
} finally {
readLock.unlock();
}
});
Thread readThread2 = new Thread(() -> {
readLock.lock();
try {
readA();
} finally {
readLock.unlock();
}
});
Thread writeThread = new Thread(() -> {
writeLock.lock();
try {
writeA();
} finally {
writeLock.unlock();
}
});
readThread1.start();
readThread2.start();
writeThread.start();
}
}
读写锁的抽象接口,用于返回一对读写锁:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
// 使用同步器初始化读写锁对象
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
}