Synchronized锁实现原理

Synchronized锁实现原理

马草原 1,162 2022-10-15

Synchronized锁实现原理

前置条件下载hotspot源码(本文基于hotspot虚拟机)

下载方式:https://www.mcaoyuan.com/archives/download-jdksource


Synchronized锁介绍

Java中的synchronized 是一种关键字,用于实现线程同步。它可以用于方法或代码块,确保在同一时间只有一个线程可以访问被synchronized 修饰的代码。这样可以避免多个线程同时访问共享资源而导致的数据不一致或冲突。
JDK1.5之前synchronized 是一个重量级锁,JDK1.6对synchronized` 进行的各种优化后性能有很大的提高,包括一些列的锁优化和锁升级策略。

Synchronized的作用主要有三个:

  • 原子性:确保线程互斥的访问同步代码;
  • 可见性:保证共享变量的修改能够及时可见,其实是通过Java内存模型中的 对一个变量unlock 操作之前,必须要同步到主内存中;如果对一个变量进行lock 操作,则将会清空工作内存中此变量的值,在执行引擎使用此变量前,需要重新从主内存中load 操作或assign 操作初始化变量值来保证的;
  • 有序性:有效解决重排序问题,即 一个unlock操作先行发生(happen-before) 于后面对同一个锁的lock 操作

Synchronized锁的使用

  1. 锁对象
public class SynchronizedThisDemo {
    public void method() throws InterruptedException {
        synchronized (this) {
            System.out.println("同步方法执行...");
        }
    }
}
  1. 锁方法
public class SynchronizedMethodDemo {
    public synchronized void method() {
        System.out.println("锁方法..");
    }
}
  1. 锁静态方法
public class SynchronizedStaticMethodDemo {
    public static synchronized void method() {
        System.out.println("锁静态方法..");
    }
}


Synchronized实现同步的原理

通过查看字节码(针对锁对象的方式):

IDEA插件库搜索jclasslib即可快速查看

SynchronizedThisDemo

 0 aload_0
 1 dup
 2 astore_1
 3 monitorenter
 4 getstatic #2 <java/lang/System.out : Ljava/io/PrintStream;>
 7 ldc #3 <同步方法执行...>
 9 invokevirtual #4 <java/io/PrintStream.println : (Ljava/lang/String;)V>
12 aload_1
13 monitorexit
14 goto 22 (+8)
17 astore_2
18 aload_1
19 monitorexit
20 aload_2
21 athrow
22 return

反编译结果:

monitorenter指令:每个对象都是一个监视器锁(Monitor)。当Monitor被占用时就会处于锁定状态,线程执行monitorenter 指令时尝试获取Monitor的所有权,过程如下:

  1. 如果Monitor的进入数为0,则该线程进入Monitor,然后将进入数设置为1,该线程即为Monitor的所有者
  2. 如果线程已经占有该Monitor,只是重新进入,则进入Monitor的进入数加1;
  3. 如果其他线程已经占用了Monitor,则该线程进入阻塞状态,直到Monitor的进入数为0,再重新尝试获取Monitor的所有权;

monitorexit指令:执行monitorexit的线程必须是对应Monitor的所有者。指令执行时,Monitor的进入数减1,如果减1后进入数为0,那线程退Monitor,不再是这个Monitor的所有者。其他被这个Monitor阻塞的线程可以尝试去获取这个 Monitor 的所有权。

monitorexit指令出现了两次,第1次为同步正常退出释放锁;第2次为发生异步退出释放锁(为了避免死锁问题)

Synchronized的底层是通过一个Monitor的对象来完成,其实wait/notify 等方法也依赖于Monitor对象(都是基于Monitor对象完成的)。
因此只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException

锁方法是没有显式的monitor指令的

  1. 锁方法:
    SynchronizedMethod
  2. 锁同步方法:
    SynchronizedStaticMethod

Synchronized作用于方法时是通过在访问标志(access_flags)中添加SYNCHRONIZED 标识符完成锁操作。但是两种同步方式本质上没有区别,方法的同步是一种隐式的方式来实现,无需通过字节码来完成。

  1. 锁对象的access_flags
    access_flags_this
  2. 锁方法的access_flags
    access_flags
  3. 锁静态方法的access_flags
    access_flags_static


什么是监视器(Monitor)?

那什么是Monitor?其实就是一个对象,可以把它理解为一个同步工具对象。
任何对象都内置了一个独一无二的Monitor对象,因此可以把任意一个不为NULL的对象作为锁。
Monitor对象存在于每个Java对象的对象头Mark Word中(存储的指针的指向)

任何一个对象都有一个Monitor与之关联,当且一个Monitor被持有后,它将处于锁定状态。Synchronized在JVM里的实现都是基于进入和退出Monitor对象来实现方法同步和代码块同步,虽然具体实现细节不一样,但是都可以通过成对的MonitorEnter和MonitorExit指令来实现。

  • MonitorEnter指令:插入在同步代码块的开始位置,当代码执行到该指令时,将会尝试获取该对象Monitor的所有权,即尝试获得该对象的锁;
  • MonitorExit指令:插入在方法结束处和异常处,JVM保证每个MonitorEnter必须有对应的MonitorExit

在Java虚拟机(HotSpot)中,Monitor是由ObjectMonitor实现的,其主要数据结构如下:
文件位置:hotspot/src/share/vm/runtime/objectMonitor.hpp

ObjectMonitor() {
    _header       = NULL;
    _count        = 0; // 记录当前线程对锁的递归获取次数。每次线程成功获取锁时,_count会加1;每次释放锁时,_count会减1。当_count为0时,表示该线程已经完全释放了锁。
    _waiters      = 0, // 记录当前线程对该锁的递归获取次数。
    _recursions   = 0; // 锁重入的次数
    _object       = NULL; // 这个ObjectMonitor关联的对象
    _owner        = NULL; // 指向当前拥有该锁的线程。
    _WaitSet      = NULL; // 处于wait状态的线程(调用wait方法),会被加入到_WaitSet
    _WaitSetLock  = 0 ; // 是_WaitSet的锁 用来确保多线程访问_WaitSet是安全的
    _Responsible  = NULL ; // 用于记录等待唤醒责任方,当一个线程需要等待某些条件时,它会将自己标记为等待唤醒的责任方,并将_Responsible设置为自己。这样,在其他线程满足条件并唤醒等待线程时,可以通过_Responsible字段找到等待唤醒的责任方。
    _succ         = NULL ; // 继任者 也就是当前持有锁的线程释放锁或者wait的时候会唤醒继任者
    _cxq          = NULL ; // 存放在竞争monitor时失败的线程的队列(Contended eXit Queue)
    FreeNext      = NULL ;
    _EntryList    = NULL ; // 处于等待锁block状态的线程,会被加入到该列表
    _SpinFreq     = 0 ; // 自旋锁的自旋次数阈值。
    _SpinClock    = 0 ; // 自旋锁的计数器。
    OwnerIsThread = 0 ; // 表示_owner字段是否指向线程对象。
  }

监视器Monitor有两种同步方式:

  1. 互斥:多线程环境下线程之间如果需要共享数据,需要解决互斥访问数据的问题,监视器可以确保监视器上的数据在同一时刻只会有一个线程在访问
  2. 协作:一个线程向缓冲区写数据,另一个线程从缓冲区读数据,如果读线程发现缓冲区为空就会等待,当写线程向缓冲区写入数据,就会唤醒读线程,这里读线程和写线程就是一个合作关系。

JVM通过Object类的wait()方法来使自己等待,在调用wait()方法后,该线程会释放它持有的监视器,直到其他线程通知它才有执行的机会。一个线程调用notify()方法通知在等待的线程,这个等待的线程并不会马上执行,而是要通知线程释放监视器后,它重新获取监视器才有执行的机会。如果刚好唤醒的这个线程需要的监视器被其他线程抢占,那么这个线程会继续等待。Object类中的notifyAll()方法可以解决这个问题,它可以唤醒所有等待的线程,总有一个线程执行。

_EntryList和_WaitSet等待队列的关系以及Monitor的协作关系:
_WaitSet

如上图所示,一个线程通过1号门进入Entry Set(入口区),如果在入口区没有线程等待,那么这个线程就会获取监视器成为监视器的Owner,然后执行监视区域的代码。如果在入口区中有其它线程在等待,那么新来的线程也会和这些线程一起等待。线程在持有监视器的过程中,有两个选择,一个是正常执行监视器区域的代码,释放监视器,通过5号门退出监视器;还有可能等待某个条件的出现,于是它会通过3号门到Wait Set(等待区)休息,直到相应的条件满足后再通过4号门进入重新获取监视器再执行。

当一个线程释放监视器时,在入口区和等待区的等待线程都会去竞争监视器,如果入口区的线程赢了,会从2号门进入;如果等待区的线程赢了会从4号门进入。只有通过3号门才能进入等待区,在等待区中的线程只有通过4号门才能退出等待区,也就是说一个线程只有在持有监视器时才能执行wait操作,处于等待的线程只有再次获得监视器才能退出等待状态。



Java对象头

对象头重的MarkWord对于锁的实现至关重要,因此需要先搞懂对象头
ObjectHeader

  • 实例数据:存放类的属性数据信息,包括父类的属性信息;
  • 对齐填充:由于虚拟机要求 对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了对齐
  • 对象头:Java对象头一般占有2个机器码(在32位虚拟机中,1个机器码等于4字节,也就是32bit,在64位虚拟机中,1个机器码是8个字节,也就是64bit),但是如果对象是数组类型,则需要3个机器码,因为JVM虚拟机可以通过Java对象的元数据信息确定Java对象的大小,但是无法从数组的元数据来确认数组的大小,所以用一块来记录数组长度

对象头的Mark Word:

MarkWord

Hotspot虚拟机的对象头主要包括两部分数据:

  • Mark Word(标记字段):存储对象自身的运行时数据
  • Class Pointer(类型指针):是对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例

Synchronized用的锁就是存在Java对象头里的。

Mark Word存储结构:

对象头信息是与对象自身定义的数据无关的额外存储成本,但是考虑到虚拟机的空间效率,Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据,它会根据对象的状态复用自己的存储空间,也就是说,Mark Word会随着程序的运行发生变化,可能变化为存储以下4种数据:

image-1699004597495

对象头中Mark Word与线程中Lock Record

在线程进入同步代码块的时候,如果此同步对象没有被锁定,即它的锁标志位是01,则虚拟机首先在当前线程的栈中创建我们称之为锁记录Lock Record的空间,用于存储锁对象的Mark Word的拷贝,官方把这个拷贝称为Displaced Mark Word

Lock Record是线程私有的数据结构,每一个线程都有一个可用Lock Record列表,同时还有一个全局的可用列表。每一个被锁住的对象Mark Word都会和一个Lock Record关联(对象头的MarkWord中的Lock Word指向Lock Record的起始地址),同时Lock Record中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。

Lock Record的内部结构:

Lock Record 描述
Owner 初始时为NULL表示当前没有任何线程拥有该monitor record,当线程成功拥有该锁后保存线程唯一标识,当锁被释放时又设置为NULL;
EntryQ 关联一个系统互斥锁(semaphore),阻塞所有试图锁住monitor record失败的线程;
RcThis 表示blocked或waiting在该monitor record上的所有线程的个数;
Nest 用来实现 重入锁的计数;
HashCode 保存从对象头拷贝过来的HashCode值(可能还包含GC age)。
Candidate 用来避免不必要的阻塞或等待线程唤醒,因为每一次只有一个线程能够成功拥有锁,如果每次前一个释放锁的线程唤醒所有正在阻塞或等待的线程,会引起不必要的上下文切换(从阻塞到就绪然后因为竞争锁失败又被阻塞)从而导致性能严重下降。Candidate只有两种可能的值0表示没有需要唤醒的线程1表示要唤醒一个继任线程来竞争锁。

在 JDK1.6 中JVM团队对锁进行了重要改进,优化了其性能引入了偏向锁轻量级锁适应性自旋锁消除锁粗化等实现,大幅提高了Synchronized的性能。

总体上来说锁状态升级流程如下:
lock



偏向锁

获取流程

当线程访问同步块并获取锁时处理流程如下:

  1. 检查mark word中的线程 id 。
  2. 如果mark word为空则使用CAS安全设置当前线程id,如果设置成功则获取锁成功,否则就撤销偏向锁。
  3. 如果不为空则检查线程id为是否为本线程。如果是则获取锁成功,如果失败则撤销偏向锁。

持有偏向锁的线程以后每次进入这个锁相关的同步块时,只需比对一下mark word的线程 id 是否为本线程,如果是则直接走锁重如逻辑,不需要再走获取锁逻辑性能很高

偏向锁的撤销流程

  1. 偏向锁的撤销动作必须等待全局安全点Global Safepoint
  2. 暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态
  3. 撤销偏向锁恢复到无锁标志位为 01或轻量级锁标志位为 00的状态

全局安全点(Global Safepoint)是指在程序执行过程中的一个特定时间点,所有线程都达到了一个安全点,并且暂停执行。在全局安全点上,线程的执行状态被冻结,不会进行任何可能会影响垃圾回收或其他重要操作的操作。
撤销偏向锁需要在全局安全点进行是为了确保没有其他线程正在访问被偏向锁占用的对象,避免在撤销过程中有线程仍然持有偏向锁,导致撤销失败或产生不一致的状态。

优点

  • 减少同步操作的开销:偏向锁的目标是在没有竞争的情况下提供高效的同步。它通过将对象的Mark Word设置为偏向锁状态,使得同一线程多次进入同步代码块时无需再进行加锁操作,从而减少了同步操作的开销。
  • 提高单线程性能:对于只有一个线程访问同步代码块的情况,偏向锁可以极大地提高性能。线程无需竞争锁,直接进入同步代码块,避免了多线程竞争带来的开销。

缺点:

  • 竞争情况下性能下降:当多个线程竞争同一个偏向锁时,偏向锁会被撤销,导致性能下降。此时,需要进行额外的同步操作,包括撤销偏向锁和竞争锁的过程,可能会带来一定的开销。
  • 需要撤销偏向锁:当发生竞争时,偏向锁需要被撤销,以保证公平性。偏向锁撤销的过程涉及到全局安全点的等待和暂停线程执行,可能会对系统的响应性产生一定影响。


轻量级锁

加锁过程

多个线程竞争偏向锁导致偏向锁升级为轻量级锁

  1. JVM 在当前线程的栈帧中创建LockReocrd,并将对象头中的Mark Word复制到LockReocrd中。
  2. 线程尝试使用CAS将对象头中的Mark Word替换为指向Lock Reocrd的指针。如果成功则获得锁,如果失败则先检查对象的Mark Word是否指向当前线程的栈帧如果是则说明已经获取锁,否则说明其它线程竞争锁则膨胀为重量级锁。

解锁过程

  1. 使用CAS操作将Mark Word还原(拷贝回去)
  2. 如果第 1 步执行成功则释放完成
  3. 如果第 1 步执行失败(多线程竞争)则膨胀为重量级锁

优点

对于绝大部分的锁在整个生命周期内都是不会存在竞争或者竞争不激烈(抢锁的线程很少 那就可以通过自旋优化避免频繁转换线程状态带来的性能开销)。

缺点

长时间持有锁的任务不适用,竞争激烈的场景会带来额外的性能开销(轻量级锁升级为重量级锁的额外开销)。



自旋机制

当轻量级锁被占用时有其他线程来获取锁并不会立即挂起等待,而是进入一个自旋等待,当自旋次数超过了设定的阈值就会膨胀为重量级锁。
自旋其实就是循环等待,俗称占着茅坑不拉屎

默认情况下自旋次数是 10 次用户可以使用参数 -XX : PreBlockSpin 来更改。
线程频繁挂起唤醒(状态变化)带来的性能开销:

  • 切换内核态和用户态
  • 刷新CPU缓存
  • 保存和恢复线程上下文

优点:

当线程持有锁时间很短,竞争不激烈的情况下性能很好,避免了线程状态变换带来的沉重开销。

缺点 :

当线程持有锁时间长,竞争激烈的场景下自旋是白白浪费性能。自旋后能获取到锁的场景才能发挥自旋的性能优势。自旋次数很难评估(针对不同核数的CPU以及不同架构的CPU以及不同的业务逻辑自旋成功且不太影响性能的自旋次数很难评估,JVM团队在1.6后加入了适应性自旋来解决问题)



适应性自旋机制

适应性自旋和自旋本质实现是一样的,不过适应性自旋的自旋次数不再是固定值,而是JVM由前一次在同一个锁上的自旋时间及锁的拥有者的状态来分析决定。如果对于某个锁很少自旋成功那么以后有可能省略掉自旋过程以避免资源浪费。有了自适应自旋随着程序运行和性能监控信息的不断完善,虚拟机对程序锁的状况预测就会越来越准确,虛拟机就会变得越来越聪明了。

优缺点同上自旋机制



重量级锁

当多个线程竞争同一个锁时,如果锁的竞争比较激烈,就会导致锁的膨胀。在Synchronized锁膨胀为重量级锁时,JVM会将锁的实现从对象监视器ObjectMonitor切换为操作系统级别的互斥量Mutex
重量级锁的实现主要是基于操作系统提供的互斥量机制,它需要进行用户态内核态之间的切换,性能开销比较大。
当一个线程获取了重量级锁后,其他线程就必须进入阻塞状态等待锁的释放。

Mutex的实现通常是基于操作系统提供的原子操作和内核态/用户态之间的切换。当一个线程尝试获取一个Mutex时,如果Mutex已经被其他线程获取了,那么该线程就会被阻塞,直到Mutex被释放为止。当Mutex被释放后,等待的线程就会被唤醒,并且有机会获取Mutex并继续执行。
Mutex的实现可以是硬件级别的,也可以是软件级别的。在硬件级别上,Mutex通常是通过CPU提供的原子指令来实现的,具有非常高的性能和可靠性。在软件级别上,Mutex通常是通过操作系统提供的互斥量机制来实现的,具有更好的灵活性和可移植性。

在重量级锁中没有竞争到锁的对象会park被挂起,退出同步块时unpark唤醒后续线程。唤醒操作涉及到操作系统调度会有额外的开销。

ObjectMonitor中包含一个同步队列(由_cxq_EntryList组成)一个等待队列(_WaitSet)。

  • notifynotifyAll唤醒时根据policy策略选择加入的队列(policy 默认为 0)
  • 退出同步块时根据QMode策略来唤醒下一个线程(QMode 默认为 0)

Policy:

  • Policy为0时:放入到EntryList队列的排头位置
  • Policy为1时:放入到EntryList队列的末尾位置
  • Policy为2时:如果EntryList是否为空就放入EntryList中,否则放入cxq队列头部(默认策略)
  • Policy为3时 :如果cxq是否为空,直接放入头部,否则放入cxq队列末尾位置

QMode:

  • QMode为0时,先判断EntryList是否为空,如果不为空,则取出第一个唤醒,如果为空再从cxq里面获取第一个唤醒 (默认的)
  • QMode为2时,取cxq头部节点直接唤醒
  • QMode为3时,如果cxq非空,把cxq队列放置到EntryList的尾部(顺序跟cxq一致)
  • QMode为4时,如果cxq非空,把cxq队列放置到EntryList的头部(顺序跟cxq相反)

以上策略都是提高线程调度的效率和公平性,都是JVM内部的实现细节无需手动设置。



锁消除

JVM通过逃逸分析算法来确定是否启用锁消除优化。

加锁是为了保证共享变量在多线程下的安全问题,逃逸分析可以分析出一个对象是否发生逃逸(能否被其他线程访问),比如下面的代码,lockEliminationTest()中的vector变量是方法的内部的,分配在栈空间,无法对其他线程共享,那就不存在多线程安全问题,JVM可以大胆的将Vector#add方法的锁消除。

public class SynchronizedTest {

    public Vector<String> vector = new Vector<String>();

    /**
     * 锁消除测试
     */
    public void lockEliminationTest() {
        Vector<String> vector = new Vector<String>();

        for (int i = 0; i < 10; i++) {
            vector.add(i + "次");
        }

        System.out.println(vector);
    }

    /**
     * 锁粗化测试
     */
    public void lockCoarseningTest() {
        for (int i = 0; i < 10; i++) {
            vector.add(i + "次");
        }

        System.out.println(vector);
    }

}

Vector是一个线程安全的容器,他的写操作是加synchronized锁的

public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}


锁粗化

同上代码所示:lockCoarseningTest()

vector每次add的时候都需要加锁操作,JVM检测到对同一个对象连续加锁、解锁操作,会合并一个更大范围的加锁、解锁操作,即加锁解锁操作会移到for循环之外

需要注意的是锁粗化并不适用于所有场景,仅适用于对同一个对象连续加锁、解锁操作的情况

在日常写代码的时候绝大多数的场景是需要更细化锁的粒度的,过粗的锁会导致锁的持有时间过长、甚至锁超时。导致我们的并发性能降低,还可能会导致synchronized锁优化失败(锁持有时间太长又竞争激烈的情况),造成锁膨胀变成性能低的重量级锁。



源码分析(只贴了精简后的代码)


ObjectMonitor实现

文件位置:hotspot/src/share/vm/runtime/objectMonitor.hpp

ObjectMonitor() {
    _header       = NULL;
    _count        = 0; // 记录当前线程对锁的递归获取次数。每次线程成功获取锁时,_count会加1;每次释放锁时,_count会减1。当_count为0时,表示该线程已经完全释放了锁。
    _waiters      = 0, // 记录当前线程对该锁的递归获取次数。
    _recursions   = 0; // 锁重如次数
    _object       = NULL;
    _owner        = NULL; // 指向当前拥有该锁的线程。owner 是一个临界资源 JVM 是通过 CAS 操作来保证其线程安全的。
    _WaitSet      = NULL; // 处于wait状态的线程,会被加入到_WaitSet
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ; // 用于记录当前正在执行的线程。
    _succ         = NULL ;
    _cxq          = NULL ; // _cxq用于记录等待该对象锁的线程列表。当一个线程调用对象的wait()方法后,它会被放入等待队列中,等待其他线程释放锁并唤醒它。等待队列中的线程按照先进先出的顺序等待。
    FreeNext      = NULL ;
    _EntryList    = NULL ; // 处于等待锁block状态的线程,会被加入到该列表
    _SpinFreq     = 0 ; // 自旋锁的自旋次数阈值。
    _SpinClock    = 0 ; // 自旋锁的计数器。
    OwnerIsThread = 0 ; // 表示_owner字段是否指向线程对象。
  }

monitorenter(获取锁)

文件位置:hotspot/src/share/vm/interpreter/interpreterRuntime.cpp:561

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
  // 判断当前线程是不是已经持有锁了
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif

  // PrintBiasedLockingStatistics是一个全局变量,用于控制是否打印偏向锁的统计信息。
  if (PrintBiasedLockingStatistics) {
    // 如果启用了统计信息 会记录偏向锁的统计信息(就是一些计数器)
    Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
  }

  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),"must be NULL or an object");

  // 是否使用偏向锁 JVM 启动时设置的偏向锁 -XX:-UseBiasedLocking=false/true
  if (UseBiasedLocking) {
    // 快速获取对象锁
    // JVM会先尝试获取偏向锁(如果启用了偏向锁),如果获取失败则尝试获取轻量级锁。
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    // 慢速获取对象锁 在禁用偏向锁的情况下会退化为原始的互斥方式获取锁
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }

  assert(Universe::heap()->is_in_reserved_or_null(elem->obj()), "must be NULL or an object");

#ifdef ASSERT
  // 判断当前线程是不是已经持有锁了
  thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif

IRT_END

文件位置:hotspot/src/share/vm/runtime/objectMonitor.cpp:316

void ATTR ObjectMonitor::enter(TRAPS) {
  Thread * const Self = THREAD ;
  void * cur ;
  // 省略部分代码
  
  // 通过 CAS 操作尝试把 monitor 的_owner 字段设置为当前线程
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  if (cur == NULL) {
     assert (_recursions == 0   , "invariant") ;
     assert (_owner      == Self, "invariant") ;
     return ;
  }

  // 线程重入(锁重入 增加重入次数后直接返回),recursions++
  if (cur == Self) {
     _recursions ++ ;
     return ;
  }

  // 如果当前线程是第一次进入该 monitor, 设置_recursions 为 1,_owner 为当前线程
  if (Self->is_lock_owned ((address)cur)) {
    assert (_recursions == 0, "internal state error");
    _recursions = 1 ;
    _owner = Self ;
    OwnerIsThread = 1 ;
    return ;
  }

    for (;;) {
      jt->set_suspend_equivalent();
      // 如果获取锁失败,则开始等待锁的释放;
      EnterI (THREAD) ;

      if (!ExitSuspendEquivalent(jt)) break ;
          _recursions = 0 ;
      _succ = NULL ;
      exit (false, Self) ;

      jt->java_suspend_self();
    }
    Self->set_current_pending_monitor(NULL);
  }
}

monitor (等待锁)

  1. 插入_cxq队列的头部
  2. 作为_Responsible或者普通线程在队列中等待
  3. 获取锁后从_cxq或者EntryList中移除

文件位置:hotspot/src/share/vm/runtime/objectMonitor.cpp:482

void ObjectMonitor::EnterI(TRAPS) {
  Thread * const Self = THREAD;
  assert(Self->is_Java_thread(), "invariant");
  assert(((JavaThread *) Self)->thread_state() == _thread_blocked, "invariant");

  // 尝试直接获取锁 获取到直接返回
  if (TryLock (Self) > 0) {
    assert(_succ != Self, "invariant");
    assert(_owner == Self, "invariant");
    assert(_Responsible != Self, "invariant");
    return;
  }

  // 延迟初始化对象监视器(Object Monitor)
  DeferredInitialize();

  // 尝试自旋获取锁
  if (TrySpin (Self) > 0) {
    assert(_owner == Self, "invariant");
    assert(_succ != Self, "invariant");
    assert(_Responsible != Self, "invariant");
    // 获取成功直接退出
    return;
  }

  // 初始化了一个等待对象 node,存储的是自身
  ObjectWaiter node(Self);
  Self->_ParkEvent->reset();
  node._prev   = (ObjectWaiter *) 0xBAD;
  node.TState  = ObjectWaiter::TS_CXQ;

  ObjectWaiter * nxt;
  for (;;) { 
    node._next = nxt = _cxq;
    // 通过CAS把初始化的等待对象插入cxq队列的首位
    if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;

    // 插入_cxq失败了说明有线程修改了cxq队列
    // 再次尝试直接获取锁 获取到直接返回 获取不到进入下一次循环
	// 除非成功插入了cxq等待队列或者获取到锁才会跳出这个循环
    if (TryLock (Self) > 0) {
      assert(_succ != Self, "invariant");
      assert(_owner == Self, "invariant");
      assert(_Responsible != Self, "invariant");
      return;
    }
  }
    
  // 当EntryList为空时,说明没有正在等待的线程,直接将_Responsible设置为自己
  // _Responsible是有限时间的park不会出现线程搁浅问题,因为ObjectMonitor会定期检查_owner字段
  if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
    // Try to assume the role of responsible thread for the monitor.
    // CONSIDER:  ST vs CAS vs { if (Responsible==null) Responsible=Self }
    Atomic::replace_if_null(Self, &_Responsible);
  }

  int nWakeups = 0;
  int recheckInterval = 1;

  for (;;) {
    // 再次尝试直接获取锁 获取成功跳出循环
    if (TryLock(Self) > 0) break; 
    // 检查并确保当前线程没有拿到锁
    assert(_owner != Self, "invariant");

    if ((SyncFlags & 2) && _Responsible == NULL) {
      Atomic::replace_if_null(Self, &_Responsible);
    }

    // 获取锁失败,park自己,分为是不是_Responsible,如果是那么是timedpark,
    // 间隔一定时间后尝试获取_owner
    if (_Responsible == Self || (SyncFlags & 1)) {
      // 如果_Responsible是自己 那就走timedpark(有限时间的等待)
      TEVENT(Inflated enter - park TIMED);
      Self->_ParkEvent->park((jlong) recheckInterval);
      // Increase the recheckInterval, but clamp the value.
      recheckInterval *= 8;
      if (recheckInterval > MAX_RECHECK_INTERVAL) {
        recheckInterval = MAX_RECHECK_INTERVAL;
      }
    } else {
      // park自己,线程挂起等待
      TEVENT(Inflated enter - park UNTIMED);
      Self->_ParkEvent->park();
    }

    // 再次尝试直接获取锁
    if (TryLock(Self) > 0) break;

    ...
	// 线程被唤醒的次数
    ++nWakeups;

    // 自旋条件成立自旋等待 自旋成功则跳出
    if ((Knob_SpinAfterFutile & 1) && TrySpin(Self) > 0) break;

    ...
  }


    
  
  assert(_owner == Self, "invariant");
  assert(object() != NULL, "invariant");
    
  // 走到这一步已经拿到锁了
  // 从cxq或者entrylist上移除自己
  UnlinkAfterAcquire(Self, &node);
  if (_succ == Self) _succ = NULL;

  return;
}

park()、timedpark()unpark()是Java中用于线程阻塞和唤醒的底层方法。

  • park()方法用于使当前线程进入阻塞状态,暂停执行。当线程调用park()方法时,它会被挂起,直到满足某个特定条件。park()方法可以带有一个参数,用于指定线程被挂起的条件。线程被挂起后,它不会消耗CPU资源,直到被其他线程唤醒。
  • timedpark()进入带有超时时间的阻塞状态。
  • unpark()方法用于唤醒一个被挂起的线程。当某个条件满足时,可以调用unpark()方法来唤醒一个特定的线程。被唤醒的线程会从阻塞状态恢复到可运行状态,并可以继续执行。

SyncFlags是一种用于线程同步的标志位,JVM内部用于在多线程环境中进行线程间的协调优化和通信。


monitor(释放锁)

文件位置:hotspot/src/share/vm/runtime/objectMonitor.cpp:937

void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
   Thread * Self = THREAD ;

   // 存在递归获取锁的情况 递归次数自减后退出
   if (_recursions != 0) {
     _recursions--;        
     TEVENT (Inflated exit - recursive) ;
     return ;
   }

    // 通过把锁_owner字段设置为null来释放锁(省略了部分逻辑)
	OrderAccess::release_store_ptr (&_owner字段设置为null来释放锁, NULL) ;
       
    ObjectWaiter * w = NULL ;
    int QMode = Knob_QMode ;
    
    // 释放锁后根据一些策略来唤醒等待中的线程
    // QMode=2的话 _cxq 优先于 _EntryList。
    // 尝试直接从 _cxq 唤醒线程的等待者。如果成功,继任者将需要取消与 cxq 的链接。
    if (QMode == 2 && _cxq != NULL) {
      w = _cxq ;
      assert (w != NULL, "invariant") ;
      assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
      ExitEpilog (Self, w) ;
      return ;
    }
       
    // cxq 队列插入 EntryList 尾部
    if (QMode == 3 && _cxq != NULL) {
      w = _cxq ;
      for (;;) {
         assert (w != NULL, "Invariant") ;
         ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
         if (u == w) break ;
         w = u ;
      }
      ObjectWaiter * q = NULL ;
      ObjectWaiter * p ;
      for (p = w ; p != NULL ; p = p->_next) {
          guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
          p->TState = ObjectWaiter::TS_ENTER ;
          p->_prev = q ;
          q = p ;
      }
    
      ObjectWaiter * Tail ;
      for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
      if (Tail == NULL) {
          _EntryList = w ;
      } else {
          Tail->_next = w ;
          w->_prev = Tail ;
      }
    }

    // cxq 队列插入到_EntryList 头部
    if (QMode == 4 && _cxq != NULL) {
      // 把 cxq 队列放入 EntryList
      // 此策略确保最近运行的线程位于 EntryList 的头部
      w = _cxq ;
      for (;;) {
         assert (w != NULL, "Invariant") ;
         ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
         if (u == w) break ;
         w = u ;
      }
      assert (w != NULL              , "invariant") ;
    
      ObjectWaiter * q = NULL ;
      ObjectWaiter * p ;
      for (p = w ; p != NULL ; p = p->_next) {
          guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
          p->TState = ObjectWaiter::TS_ENTER ;
          p->_prev = q ;
          q = p ;
      }
    
      if (_EntryList != NULL) {
          q->_next = _EntryList ;
          _EntryList->_prev = q ;
      }
      _EntryList = w ;
    }

      w = _EntryList  ;
      if (w != NULL) {
          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
      w = _cxq ;
      if (w == NULL) continue ;

      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }

      if (QMode == 1) {
         // QMode == 1 : 把 cxq 倾倒入 EntryList 逆序
         ObjectWaiter * s = NULL ;
         ObjectWaiter * t = w ;
         ObjectWaiter * u = NULL ;
         while (t != NULL) {
             guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
             t->TState = ObjectWaiter::TS_ENTER ;
             u = t->_next ;
             t->_prev = u ;
             t->_next = s ;
             s = t;
             t = u ;
         }
         _EntryList  = s ;
         assert (s != NULL, "invariant") ;
      } else {
         // QMode == 0 or QMode == 2
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
          // 将单向链表构造成双向环形链表;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }

      if (_succ != NULL) continue;

      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
}

wait 等待

文件位置:hotspot/src/share/vm/runtime/objectMonitor.cpp:1444

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;
   // 必须是Java线程才可以wait
   assert(Self->is_Java_thread(), "Must be Java thread!");
   JavaThread *jt = (JavaThread *)THREAD;

   DeferredInitialize () ;

   // 校验是否持有锁(_OWNER是不是当前线程)没有持有锁抛出异常结束
   CHECK_OWNER();

   EventJavaMonitorWait event;

   // 创建一个等待节点
   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          

   // 添加到Wait Set队列
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;

   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }
   intptr_t save = _recursions; 
   _waiters++;                 
   _recursions = 0;   

   // 退出了监视器 (也就是执行了上面所说的 Monitor 释放操作)
   exit (true, Self) ;                    
   guarantee (_owner != Self, "invariant") ;

   // 此时已经释放了锁 把自己放进了等待队列
   // 当一个线程正在处理对象监视器的退出操作时,另一个线程可能在这个过程中进入并离开对象监视器,
   // 并调用了notify()方法。这种情况下,如果另一个线程的退出操作选择了当前线程作为继任者,
   // 同时在当前线程执行退出时,另一个线程也会调用unpark()方法(唤醒线程)
   // 为了解决这个问题,重新调用unpark()方法。
   if (node._notified != 0 && _succ == Self) {
      node._event->unpark();
   }
   ...
}

notify 唤醒

文件位置:hotspot/src/share/vm/runtime/objectMonitor.cpp:1667

随机获取一个等待该锁的线程,并根据策略将等待者添加到EntryListcxq队列中然后释放锁。

void ObjectMonitor::notify(TRAPS) {
    CHECK_OWNER();

    // waitSet为空直接返回不做处理(调用wait方法的才会进入WaitSet)
    if (_WaitSet == NULL) {
        TEVENT (Empty - Notify);
        return;
    }
    DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

    int Policy = Knob_MoveNotifyee;

    Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
    // 尝试从等待队列中取出一个对象等待者(ObjectWaiter)
    ObjectWaiter *iterator = DequeueWaiter();
    
    if (iterator != NULL) {
         // 头插 EntryList
        if (Policy == 0) {
            if (List == NULL) {
                iterator->_next = iterator->_prev = NULL;
                _EntryList = iterator;
            } else {
                List->_prev = iterator;
                iterator->_next = List;
                iterator->_prev = NULL;
                _EntryList = iterator;
            }
        } else if (Policy == 1) {
            // 尾插 EntryList
            if (List == NULL) {
                iterator->_next = iterator->_prev = NULL;
                _EntryList = iterator;
            } else {
                ObjectWaiter *Tail;
                for (Tail = List; Tail->_next != NULL; Tail = Tail->_next);
                assert (Tail != NULL && Tail->_next == NULL, "invariant");
                Tail->_next = iterator;
                iterator->_prev = Tail;
                iterator->_next = NULL;
            }
        } else if (Policy == 2) { 
            // 头插 cxq
            if (List == NULL) {
                iterator->_next = iterator->_prev = NULL;
                _EntryList = iterator;
            } else {
                iterator->TState = ObjectWaiter::TS_CXQ;
                for (;;) {
                    ObjectWaiter *Front = _cxq;
                    iterator->_next = Front;
                    if (Atomic::cmpxchg_ptr(iterator, &_cxq, Front) == Front) {
                        break;
                    }
                }
            }
        } else if (Policy == 3) {
            // 尾插 cxq
            iterator->TState = ObjectWaiter::TS_CXQ;
            for (;;) {
                ObjectWaiter *Tail;
                Tail = _cxq;
                if (Tail == NULL) {
                    iterator->_next = NULL;
                    if (Atomic::cmpxchg_ptr(iterator, &_cxq, NULL) == NULL) {
                        break;
                    }
                } else {
                    while (Tail->_next != NULL) Tail = Tail->_next;
                    Tail->_next = iterator;
                    iterator->_prev = Tail;
                    iterator->_next = NULL;
                    break;
                }
            }
        } else {
            ParkEvent *ev = iterator->_event;
            iterator->TState = ObjectWaiter::TS_RUN;
            OrderAccess::fence();
            ev->unpark();
        }

        if (Policy < 4) {
            iterator->wait_reenter_begin(this);
        }
    }
    
    // 自旋释放
    Thread::SpinRelease(&_WaitSetLock);
    
}

notifyAll 唤醒

获取等待该锁的线程所有线程,并循环然后根据策略将等待者添加到EntryListcxq队列中然后释放锁。

文件位置:hotspot/src/share/vm/runtime/objectMonitor.cpp:1788

void ObjectMonitor::notifyAll(TRAPS) {
  CHECK_OWNER();
  ObjectWaiter* iterator;
  // 没有需要唤醒的线程 直接返回
  if (_WaitSet == NULL) {
      TEVENT (Empty-NotifyAll) ;
      return ;
  }
  DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;
  int Tally = 0 ;
  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;

  // 与notify的不同在于循环所有的等待线程 然后根据策略加入不同的队列中
  for (;;) {
     iterator = DequeueWaiter () ;
     if (iterator == NULL) break ;
     TEVENT (NotifyAll - Transfer1) ;
     ++Tally ;

     guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
     guarantee (iterator->_notified == 0, "invariant") ;
     iterator->_notified = 1 ;
     Thread * Self = THREAD;
     iterator->_notifier_tid = Self->osthread()->thread_id();
     if (Policy != 4) {
        iterator->TState = ObjectWaiter::TS_ENTER ;
     }

     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }

     if (Policy == 0) {       // prepend to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     } else
     if (Policy == 1) {      // append to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            // CONSIDER:  finding the tail currently requires a linear-time walk of
            // the EntryList.  We can make tail access constant-time by converting to
            // a CDLL instead of using our current DLL.
            ObjectWaiter * Tail ;
            for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
            assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
            Tail->_next = iterator ;
            iterator->_prev = Tail ;
            iterator->_next = NULL ;
        }
     } else
     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         iterator->TState = ObjectWaiter::TS_CXQ ;
         for (;;) {
             ObjectWaiter * Front = _cxq ;
             iterator->_next = Front ;
             if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                 break ;
             }
         }
     } else
     if (Policy == 3) {      // append to cxq
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
                iterator->_next = NULL ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                   break ;
                }
            } else {
                while (Tail->_next != NULL) Tail = Tail->_next ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
                break ;
            }
        }
     } else {
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;
        ev->unpark() ;
     }

     if (Policy < 4) {
       iterator->wait_reenter_begin(this);
     }
  }

  // 自旋释放锁
  Thread::SpinRelease (&_WaitSetLock) ;

}