• 周三. 6月 29th, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

JUC中锁与AQS相关知识总结

admin

11月 28, 2021

JUC中锁与AQS相关知识总结

1. Lock接口

               在Java5之前,要想使用锁来保护共享资源大多数情况是使用synchronized关键字。在Java5之后,并发包里增加了Lock接口及其实现类来提供锁的功能。

               Lock接口与synchronized有许多区别。synchronized修饰在方法体或者代码块上可以隐式地获取和释放锁,并且锁地获取和释放操作被固化。但是Lock接口可以显示地获取和释放锁,提高了锁的可操作性。并且Lock接口支持可中断锁以及超时锁等synchronized没有的特性。

               Lock接口的使用范式如下:

1 Lock lock = new ReentrantLock();
2 lock.lock();
3 try{
4 //访问临界区
5 }finally{
6 lock.unlock();
7 }

               Lock接口的实现类能实现锁的功能,靠的是聚合AQS的子类作为同步器,将提供给用户的锁的操作委托给这个同步器执行。

2. 队列同步器(AQS)

               AQS是并发包里同步组件的核心基础,可以用来构建锁和其他同步器件。

               AQS维护一个int型的同步状态属性,利用这个同步状态属性可以实现独占锁和共享锁,以及如信号量等同步组件。比如要实现独占锁,则同步状态往往初始化为1。另外AQS内部还定义了一个静态内部类Node。Node组成了AQS的同步队列和等待队列,Node里保存了线程信息,等待状态等。AQS维护同步队列的头结点head和尾结点tail,并负责同步队列的操作。

               AQS采用模板方法的设计模式,提供了独占和共享式获取释放同步状态的方法,以及可中断和超时获取同步状态的方法。这些模板方法调用了一些抽象方法。AQS里的抽象方法由继承它的子类根据需要实现。自定义同步组件往往会声明一个继承AQS的静态内部类,称为同步器。并将Lock接口提供给用户调用的方法委托给同步器处理。这样做的好处有两个方面,第一个是对同步组件的使用者隐藏了实现细节,用户只需要调用Lock接口提供的方法就可以获得锁的功能,而不知道具体的执行是由同步器来完成。第二个是向同步组件的开发者提供了便捷的开发接口,隐藏了底层操作系统的线程管理等细节。

2.1 AQS的同步状态操作

               利用AQS设计同步组件的关键在于同步状态。AQS提供了三个方法保证线程安全的修改同步状态

 1 private volatile int state;
 2 protected final void setState(int newState) {
 3         state = newState;
 4 }
 5 protected final int getState() {
 6         return state;
 7 }
 8 protected final boolean compareAndSetState(int expect, int update) {
 9         // See below for intrinsics setup to support this
10         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
11 }

               可以看到同步状态的线程安全是基于volatile的可见性和禁止重排序,以及利用CAS原子性替换。

2.2 AQS的实现分析

2.2.1 同步队列

               AQS里的同步队列(FIFO队列,双向链表)用来完成同步状态的管理。当线程获取同步状态失败时,其线程引用,等待状态,会被封装成Node,并被加入到同步队列,同时AQS会阻塞该线程,直到被同步队列首节点的线程唤醒,并尝试获取同步状态。

               AQS维护同步队列的头结点head和尾结点tail,当线程获取同步状态失败时会被加入到同步队列的尾部。

               当线程获取同步状态失败时会被构造成Node并插入同步队列尾部,由于同一时刻可能与多个线程要被同时插入到尾部,为了避免出现类似HashMap在多线程环境下链表拉链可能拉成环的情况,AQS使用CAS的方式确保插入尾结点线程安全。

               AQS通过addWaiter方法构造Node结点,并将其插入到同步队列尾部,代码如下。首先尝试进行一次快速的插入,利用CAS检查实际尾结点和线程认为的尾结点是否一致,相同则修改结点指针完成插入。如果快速插入失败,则进入enq方法自旋加CAS不断尝试插入的同步队列尾部直到成功。

 1 private Node addWaiter(Node mode) {
 2         Node node = new Node(Thread.currentThread(), mode);
 3         // Try the fast path of enq; backup to full enq on failure
 4         Node pred = tail;
 5         if (pred != null) {
 6             node.prev = pred;
 7             if (compareAndSetTail(pred, node)) {
 8                 pred.next = node;
 9                 return node;
10             }
11         }
12         enq(node);
13         return node;
14 }

               AQS的头结点通过unpartSuccessor方法释放同步状态,并利用LockSupport的方法唤醒其后继结点,这一过程没有使用CAS。头结点将其引用修改尾其后继结点,并断开与后继结点的连接,完成释放。

2.2.2 独占式同步状态获取与释放

               调用AQS的acquire(int)方法可以独占式的获取同步状态,该方法的逻辑简单来说是这样的:首先调用tryAcquire(int)方法,线程安全地获取同步状态,如果获取同步状态失败,则构造Node结点,并将其加入等待队列中。在等待队列里的线程不断地自旋,检查其前驱是否是首结点并且能否获取同步状态,如果前驱不是首结点或者获取同步状态失败,则修改结点的等待状态,并且阻塞结点保存的线程,直到首结点释放同步状态,并唤醒后继结点,让等待的线程成功获取到同步状态,这个自旋地过程是不响应中断的。

 1 public final void acquire(int arg) {
 2         if (!tryAcquire(arg) &&
 3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 4             selfInterrupt();
 5 }
 6 final boolean acquireQueued(final Node node, int arg) {
 7         boolean failed = true;
 8         try {
 9             boolean interrupted = false;
10             for (;;) {
11                 final Node p = node.predecessor();
12                 //如果前驱不是首结点或者前驱是首结点但是获取同步状态失败
13                 //则被阻塞
14                 if (p == head && tryAcquire(arg)) {
15                     setHead(node);
16                     p.next = null; // help GC
17                     failed = false;
18                     return interrupted;
19                 }
20                 if (shouldParkAfterFailedAcquire(p, node) &&
21                     parkAndCheckInterrupt())
22                     interrupted = true;
23             }
24         } finally {
25             if (failed)
26                 cancelAcquire(node);
27         }
28 }

               调用AQS的release(int)方法可以释放独占式同步状态。该方法的逻辑简单如下:首先调用treRelease释放同步状态,如果释放成功,则唤醒其后继结点,并且将头结点引用修改成其后继。由于获取到同步状态只能由一个线程,故这一操作不需要CAS来保证线程安全。

1 public final boolean release(int arg) {
2         if (tryRelease(arg)) {
3             Node h = head;
4             if (h != null && h.waitStatus != 0)
5                 unparkSuccessor(h);
6             return true;
7         }
8         return false;
9 }

2.2.3 共享式同步状态获取与释放

               共享式访问同步状态与独占式访问最大的区别在于,共享式访问不会阻塞其他共享式访问同步状态,但是独占式只能有一个线程进入访问,其他线程无论是共享还是独占都会被阻塞。

               调用AQS的acquireShared(int)方法来共享式获取同步状态,首先调用tryAcquireShared(int)方法尝试获取共享同步状态,当该方法返回值大于等于0表示获取成功,小于0获取失败,则调用doAcquireShared(int)方法。doAcquireShared方法简单来说,就是构造Node结点,加入到同步队列尾部,然后进入自旋过程。在自选过程里,当前驱是首结点时,再次尝试获取同步状态,获取成功则退出自旋,否则被阻塞。

 1 public final void acquireShared(int arg) {
 2         if (tryAcquireShared(arg) < 0)
 3             doAcquireShared(arg);
 4 }
 5 private void doAcquireShared(int arg) {
 6         //构造Node结点,构造状态为共享
 7         final Node node = addWaiter(Node.SHARED);
 8         boolean failed = true;
 9         try {
10             boolean interrupted = false;
11             //自旋尝试获取共享式同步状态
12             for (;;) {
13                 final Node p = node.predecessor();
14                 //只有当前驱是首结点时才会尝试获取同步状态
15                 if (p == head) {
16                     int r = tryAcquireShared(arg);
17                     if (r >= 0) {
18                         setHeadAndPropagate(node, r);
19                         p.next = null; // help GC
20                         if (interrupted)
21                             selfInterrupt();
22                         failed = false;
23                         return;
24                     }
25                 }
26                 //前驱不是首结点或者获取同步状态失败,被阻塞,直到首结点释放同步状态将其唤醒
27                 if (shouldParkAfterFailedAcquire(p, node) &&
28                     parkAndCheckInterrupt())
29                     interrupted = true;
30             }
31         } finally {
32             if (failed)
33                 cancelAcquire(node);
34         }
35 }

               调用AQS的releaseShared(int)方法释放共享式同步状态,该方法首先调用tryReleaseShared(int),尝试释放同步状态,如果释放成功,则调用doReleaseShared()方法,唤醒后续处于等待的结点,这过程是采用CAS来保证线程安全,因为共享式同步状态往往有多个线程同时持有同步状态。

2.2.4 等待队列

               在总结AQS等待队列实现之前,首先要总结一下Condition接口相关的知识。

               任意一个Java对象都有wait,notify,notifyAll方法,利用这些方法可以在线程不满足某些执行条件时进入等待状态,直到其他线程将其唤醒。Java对象自带的这些监视器方法配合synchronized锁可以实现等待通知范式,但是相比之下,利用Condition接口提供的方法也能做到一样的功能,并且更加灵活。且Condition接口与对象监视器方法的不同点有:Condition支持中断屏蔽等待,特定时间等待,以及最重要的,对象监视器只有一个等待队列,但是利用Condition可以支持多个条件,多个等待队列。

               Condition的使用也非常简单,下面是一个简单的示例:

 1 Lock lock = new ReentrantLock();
 2 Condition c1 = lock.newCondition();
 3  4 public void sample() throw InterruptedException{
 5     //获取锁
 6     lock.lock();
 7     try{
 8         //只有获取锁成功才能调用条件对象的相关方法,调用之后,线程释放锁,并被构造成Node结点
 9         //进入Condition的等待队列
10         condition.await();
11     }finally{
12         lock.unlock();
13     }
14 }

               获取到锁的线程调用await方法会释放锁,然后进入等待队列,直到其他线程调用signal方法将其唤醒。Condition对象的创建依赖于Lock对象。

               ConditionObject是AQS的内部类,实现了Condition接口。ConditionObject对象维护等待队列的头结点和尾结点。等待队列的结点类型与同步队列结点一样,都是Node。AQS里可以被多个ConditionObject依赖,故相比于synchronized的对象监视器只能有一个等待队列,基于AQS的同步组件可以有多个等待队列。

               当获取到锁的线程调用await方法时将会释放锁并进入等待状态,具体过程简单来说如下:首先检查线程中断状态,如果被中断抛出中断异常。之后调用addConditionWaiter()方法将线程引用以及等待状态构造成Node结点,并释放锁,这部分过程没有CAS来确保线程安全,因为这时候线程还持有锁。最后进入自旋过程,不断检查引用了自己的结点是否被加入到同步队列中参与同步状态的获取,如果没有被加入同步队列,则被阻塞。最后当发现自己被加入到同步队列时,退出自旋过程,调用aquiredQueued方法自旋获取同步状态。

 1 public final void await() throws InterruptedException {
 2             if (Thread.interrupted())
 3                 throw new InterruptedException();
 4             //将线程引用以及等待状态构造成新的Node结点,添加到等待队列尾部,这个过程由锁确保线程安全
 5             Node node = addConditionWaiter();
 6             //释放锁,并唤醒同步队列的后继结点
 7             int savedState = fullyRelease(node);
 8             int interruptMode = 0;
 9             //自旋检查自己是否被加入到同步队列,没有则被阻塞
10             while (!isOnSyncQueue(node)) {
11                 LockSupport.park(this);
12                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
13                     break;
14             }
15             //当发现自己被加入到同步队列,退出自旋,并参与到锁的获取
16             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
17                 interruptMode = REINTERRUPT;
18             if (node.nextWaiter != null) // clean up if cancelled
19                 unlinkCancelledWaiters();
20             if (interruptMode != 0)
21                 reportInterruptAfterWait(interruptMode);
22 }

               当调用ConditionObject对象的signal方法时,过程简单描述如下:首先检查并确保调用siganl方法的线程是获取锁的线程。之后调用doSignal方法,循环尝试修改等待队列头结点指向其后继,并断开原本首结点与其后继的指针,再进入自旋加CAS过程,尝试将其从等待队列转移到同步队列尾部,这个过程没有构造新的Node。当被加入到同步队列成功时,当前线程唤醒刚被插入到同步队列尾部的结点的线程,被唤醒的线程退出wait方法里的检查是否在同步队列循环,参与到自旋获取锁的过程。

 1 public final void signal() {
 2     if (!isHeldExclusively())
 3        throw new IllegalMonitorStateException();
 4     Node first = firstWaiter;
 5     if (first != null)
 6        doSignal(first);
 7 }
 8 private void doSignal(Node first) {
 9     do {
10        if ( (firstWaiter = first.nextWaiter) == null)
11             lastWaiter = null;
12        first.nextWaiter = null;
13    } while (!transferForSignal(first) &&
14        (first = firstWaiter) != null);
15 }

 

发表评论

您的电子邮箱地址不会被公开。