锁机制

锁的基本原理

指令由cpu执行,任何语言程序的锁都是由指令cmpxchg,全称 compare and exchange

cmpxchg dest,src
  1. cpu 要将count数值更改,他会先从内存中加载此值

  2. cpu执行cmpxchg,将会:

    1. 比较 当前持有的值与内存中的值是否相同

    2. 如果相同代表数值没有被修改,将会替换值,并返回成功

    3. 如果不相同则代表数值有可能被修改,并返回失败

此外,CPU为了满足cmpxchg指令的原子性,为其增加了lock:

lock cmpxchg

所有的锁都基于cmpxchg指令(x86),在Java中,有一个native方法映射了该指令:

// sun.misc.Unsafe
/**
 * 如果达到期望值,就会使用替换值替换
 *
 * @param o 和 offset 用于表示目标变量的内存地址
 * @param expected 期望值
 * @param x 替换值
 */
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

如何实现一个锁

  1. 如何表示锁的状态

    1. boolean state,只能表示两种状态

    2. int state 不仅可以表示状态,还可以记录锁被持有的数量,这样可以提供对重入锁的支持

  2. 保证多线程抢锁线程安全

    1. 基于 lock cmpxchg,对应java Unsafe.compareAndSwapInt 方法,也就是CAS

  3. 处理没有获取到锁的线程

    1. 自旋等待:

      1. 缺点,后台线程自旋等待,会占用CPU,导致性能障碍

      2. 优点,适用于并发低,任务耗时短的操作,因为处理快,每个线程自旋一会儿就会获取到锁

      3. 注意,当并发数逐渐增大或者任务耗时较长时,也就是没有获取锁的等待线程数量增大,自旋的线程数量以及时间会增长,会导致cpu自旋的压力增大

    2. 阻塞等待

      1. 交给操作系统将其阻塞

      2. 当自旋锁消耗的时间大于阻塞上下文切换的时间时,该使用阻塞

    3. 自旋 + 阻塞

  4. 释放锁后如何将锁重新分配

    1. 自旋锁:每个线程在不停的循环,他们会自己抢锁

    2. 阻塞锁:唤醒线程

AQS

全称AbstractQueueSynchronizer

  1. Abstract表示该类并不知道该如何上锁,使用了模板设计模式,暴露钩子函数给子类

  2. Queue表示线程阻塞队列,抢不到锁的线程都会处于这个队列中排队

  3. Synchronizer翻译为同步器,因为他可以保证线程安全

  4. AQS底层实际上是使用 CAS + state完成多线程抢锁的逻辑

总结:子类只需要实现上锁、释放锁的逻辑,至于排队阻塞等待、唤醒机制均由AQS实现

核心代码:

  • 上锁:

    @ReservedStackAccess
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&  // 子类获取锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果锁获取失败,就将其添加到阻塞队列中
            selfInterrupt();
    }
    
    // 子类实现获取锁的逻辑,AQS并不关心
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
  • 释放锁:

    @ReservedStackAccess
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 释放锁成功
            // 检查阻塞队列唤醒线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 子类实现释放锁的逻辑,AQS并不关心
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

ReentrantLock

ReentrantLock 是基于AQS实现的可重入锁的实现类,他有公平锁、非公平锁两种实现:

public class ReentrantLock implements Lock, java.io.Serializable {
     // 同步器
     private final Sync sync;

     abstract static class Sync extends AbstractQueuedSynchronizer {...}

     // 非公平锁
     static final class NonfairSync extends Sync{...}

     // 公平锁
     static final class FairSync extends Sync{...}

     public ReentrantLock() { // 默认是非公平锁
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
}

公平锁

线程D进入后,检查阻塞队列,如果阻塞队列有阻塞线程,线程D就在后面排队

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    @ReservedStackAccess
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&  // 子类获取锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果锁获取失败,就将其添加到阻塞队列中
            selfInterrupt();
        }
    }
}

// Sync 暴露一个线程加锁时,抢锁的逻辑,以此区分公平锁和非公平锁
abstract static class Sync extends AbstractQueuedSynchronizer {

    abstract boolean initialTryLock();

    // 上锁
    @ReservedStackAccess
    final void lock() {
        if (!initialTryLock()) // 如果获取锁没有获取到
            acquire(1);  // 抢锁失败后,调用AQS标准的获取锁的流程, 会调用子类 tryAcqure()方法
    }
}

static final class FairSync extends Sync {

    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        // AQS 同步器状态
        int c = getState();
        // 如果 == 0 代表锁处于释放状态
        if (c == 0) {
            // 如果前面没有等待的线程,并且去抢锁抢到了
            if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                // 设置当前线程持有锁
                setExclusiveOwnerThread(current);
                return true;
            }
        } 
        // 如果当前线程已经持有锁(可重入特性)
        else if (getExclusiveOwnerThread() == current) {
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }

    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0  // 锁状态为0代表锁未被占有
                 && !hasQueuedPredecessors()  // 阻塞队列中没有线程
                 && compareAndSetState(0, acquires)) { // 获取锁成功
            setExclusiveOwnerThread(Thread.currentThread()); // 设置当前线程占有锁
            return true;
        }
        return false;
    }
}

非公平锁

线程D进入后,直接抢锁

abstract static class Sync extends AbstractQueuedSynchronizer {

    abstract boolean initialTryLock();

    @ReservedStackAccess
    final void lock() {
        if (!initialTryLock()) // 如果获取锁没有获取到
            acquire(1);  // 抢锁失败后,调用AQS标准的获取锁的流程
    }
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        // 非公平锁下,无论阻塞队列中是否有其他线程排队,当前线程直接抢锁
        // 没有线程切换,以及线程状态判断
        if (compareAndSetState(0, 1)) { // 0 -> 1  加锁
            // 如果上锁成功就标识当前线程为获取锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
        // 上锁失败,判断当前持有锁的线程是否时当前线程(可重入的特性) 
        else if (getExclusiveOwnerThread() == current) {
            int c = getState() + 1;
            if (c < 0) // overflow  可重入次数溢出int最大值,变为复数
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        // 没获取到锁 
        else
            return false;
    }

    /**
     * 与公平锁相比,无论队列中是否有等待线程,都尝试获取
     */
    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0 // 如果当前锁未被占有 
                 && compareAndSetState(0, acquires)) {  // 获取锁
            setExclusiveOwnerThread(Thread.currentThread());  // 设置当前
            return true;
        }
        return false;
    }
}

所以公平锁的效率是不如非公平锁的,这是因为公平锁需要线程上下文切换时间以及调度的延迟时间,而非公平锁在线程D进入时,先抢锁,不存在上述两个时间。

释放锁

// 释放锁的操作都是调用的AQS的release方法
// 此方法暴露了一个 tryRelease 钩子函数,由子类重写指定
public void unlock() {
    sync.release(1);
}

// AQS
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}

// Sync
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    // 减去release 但是并没有设置状态,所以其他线程无法进入
    int c = getState() - releases;
    // 锁定持有线程不是当前线程,或者锁压根没有上锁,抛出异常
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    // 因为是可重入,只有当c == 0 时才代表全部释放,再设置锁的站有现成为null
    boolean free = (c == 0);
    if (free)
        setExclusiveOwnerThread(null);
    // 更新state的状态
    setState(c);
    return free;
}

ReentrantReadWriteLock

基于ReentrantLock的读写锁,拆分为两把锁,读锁和写锁。适用于读多写少的场景。读读不互斥,读写互斥,写写互斥。

public class ThreadDemo {
    static volatile int a;

    public static void readA() {
        System.out.println(a);
    }

    public static void writeA() {
        a++;
    }

    public static void main(String[] args) {
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
        Thread readThread1 = new Thread(() -> {
            readLock.lock();
            try {
                readA();
            } finally {
                readLock.unlock();
            }

        });
        Thread readThread2 = new Thread(() -> {
            readLock.lock();
            try {
                readA();
            } finally {
                readLock.unlock();
            }
        });

        Thread writeThread = new Thread(() -> {
            writeLock.lock();
            try {
                writeA();
            } finally {
                writeLock.unlock();
            }
        });

        readThread1.start();
        readThread2.start();
        writeThread.start();
    }
}

核心变量与构造器

读写锁的抽象接口,用于返回一对读写锁:

public interface ReadWriteLock {

    Lock readLock();

    Lock writeLock();
}
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;
    
    public ReentrantReadWriteLock() {
        this(false);
    }
    
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        // 使用同步器初始化读写锁对象
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
}

最后更新于