在并发编程中,AQS(AbstractQueuedSynchronizer)是一个非常重要的类,JUC中很多工具都是基于AQS进行开发。其设计和编码思路都极其精妙。本文的目的是先对AQS进行一个大致的介绍,具体的实现细节将在后续的各种同步组件的实现中进行详细解释以体现出其设计思想的巧妙之处。

1. 动手实现一个锁

为了认识AQS的功能,我们先自己动手实现一个非常简易的独占不可重入锁。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class MyLock1 extends AbstractMyLock{  
// 用于保存锁状态, 1表示有线程池有所,2表示锁空闲
private volatile int status;

// 表示当前持有锁的线程
private volatile static Thread t = null;

// Unsafe类的对象
private static final Unsafe U;
private static final long statusOffset;

static {
// 使用反射获取Unsafe类的字段
Field theUnsafeField = null;
try {
theUnsafeField = Unsafe.class.getDeclaredField("theUnsafe");
// 设置字段可访问(因为字段是私有的)
theUnsafeField.setAccessible(true);
U = (Unsafe) theUnsafeField.get(null);

statusOffset = U.objectFieldOffset(MyLock1.class.getDeclaredField("status"));
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}

}

public MyLock1() {
this.status = 0;
}

// 加锁方法
public void lock() {
// 采用乐观锁,对状态进行CAS操作
while (!U.compareAndSwapInt(this, statusOffset, 0, 1)) {

}

// status置为加锁状态后,将t置为当前线程对象
t = Thread.currentThread();
}

public void unlock() {
// 确保只能是自己解锁,不能被别的线程解锁
if (t == Thread.currentThread()){
// 由于此时只有持有锁的线程才能走到这里,因此不需要使用CAS。直接赋值即可
this.status = 0;
this.t = null;
}
}
}

以上代码实现了一个极其简易的独占不可重入锁。接下来对其进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Slf4j  
public class Demo27 {
private static MyLock1 lock = new MyLock1();
private static int cnt = 0;

public void start() throws InterruptedException {
Thread t1 = new Thread(new MyRunnable());
Thread t2 = new Thread(new MyRunnable());
Thread t3 = new Thread(new MyRunnable());

t1.start();
t2.start();
t3.start();

t1.join();
t2.join();
t3.join();

log.info("The final result is: {}", cnt);
}


private class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
lock.lock();
log.debug("{} lock()", Thread.currentThread().getName());
try{
cnt ++;
log.info("cnt --> {}", cnt);
}finally {
log.debug("{} unlock()", Thread.currentThread().getName());
lock.unlock();
}
}
}
}

public static void main(String[] args) throws InterruptedException {
new Demo27().start();
}
}

部分果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock()
11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29996
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock()
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock()
11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29997
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock()
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock()
11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29998
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock()
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock()
11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29999
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock()
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock()
11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 30000
11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock()
11:06:27.706 [main] INFO com.yang.juc.Demo27 - The final result is: 30000

根据以上测试可见,锁的功能基本已经实现。但是很容易发现有很多缺陷,比如不可重入,不可中断,高并发环境下性能低等。在本文中重点针对高并发环境下性能低的缺点展开讨论。

2. 分析问题

MyLock1的实现中,可见为了保证加锁解锁操作的原子性,使用了Unsafe类中的CAS操作实现了乐观锁。

2.1 乐观锁和悲观锁

所谓乐观锁,根据名字可以猜出其设计思想,是假设最好的情况,认为共享资源每次被访问的时候不会出现问题,线程可以不停地执行,无需加锁也无需等待,只是在提交修改的时候去验证对应的资源(也就是数据)是否被其它线程修改了。
而悲观锁,总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。

对于独占锁加锁问题,可以简单地类比为上厕所的场景。一个厕所只能有一个人使用,此时如果有甲、乙、丙三人想争抢厕所,甲抢到了,剩下两人就无法进入,使用这个共享资源。接下来他俩有两种选择:1、在厕所外不停转圈圈,检查厕所是否被释放(乐观锁,自旋);2、去一个区域排好队,等待甲用完厕所,叫队列中的第一个人去上(悲观锁,进入阻塞队列)。

可见两个选择都可以实现。但是,此时设想一下,如果有一万个人在外面等待呢?不忍直视。

通过此例子来类比,容易理解,乐观锁适合线程争抢不严重的情况,可以减少线程频繁的上下文切换;而悲观锁适合线程争抢激烈的情况,可以避免大量的无意义的自旋操作占用CPU。

2.2 思考解决方案

根据以上的分析,可知乐观锁和悲观锁在不同情况下各有长处,使用哪个需要根据场景而定。此时如果我们要在之前的思路上,解决掉高并发环境下造成的CPU压力过大导致的性能变差的问题,那么就需要对之前的乐观锁进行改进。

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。

3. 引入AQS

3.1 基于AQS实现lock

AQS就实现了以上提到的一套线程阻塞等待以及被唤醒时锁分配的机制。我们可以先体验一下AQS,对MyLock1进行改造,通过AQS来实现。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j  
public class MyLock2 extends AbstractMyLock{
private Sync sync;

public MyLock2() {
sync = new Sync();
}

public void lock(){
sync.acquire(1);
}

public void unLock(){
sync.release(1);
}

private class Sync extends AbstractQueuedSynchronizer {

@Override
protected boolean tryRelease(int arg) {
return compareAndSetState(1, 0);
}

@Override
protected boolean tryAcquire(int arg) {
for (int i = 0; i < 3; i++) {
if (compareAndSetState(0, 1)) return true;
}
return false;
}

protected Sync() {
super();
}
}
}

现在对MyLock1和MyLock2进行测试对比其性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Slf4j  
public class Demo29 {

private static int cnt = 0;
private AbstractMyLock lock;

private static int THREAD_COUNT = 50;

public Demo29(int threadCount, AbstractMyLock lock) {
this.THREAD_COUNT = threadCount;
this.lock = lock;
}

private void start() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);

long start = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
executorService.execute(new MyRunnable());
}

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
log.info("lock: {}, cost time : {}", lock.getClass().getName() , System.currentTimeMillis() - start);
}

public static void main(String[] args) throws InterruptedException {
// new Demo29(100, new MyLock1()).start();
new Demo29(100, new MyLock2()).start();
}

private class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
lock.lock();
log.debug("{} lock()", Thread.currentThread().getName());
try{
cnt ++;
log.debug("cnt --> {}", cnt);
}finally {
log.debug("{} unlock()", Thread.currentThread().getName());
lock.unlock();
}
}
}
}
}

测试结果分别如下:

1
2
18:11:25.586 [main] INFO  c.y.j.Demo29 - lock: MyLock1, cost time : 15629
18:12:25.376 [main] INFO c.y.j.Demo29 - lock: MyLock2, cost time : 691

不难看出,基于AQS的锁在高并发的情况下,性能相较于于之前的单纯的CAS实现的乐观锁非常可观,然而其实现方法又非常简洁。主要归功于Doug Lea对与AQS的设计,接下来讲对AQS进行一个整体的介绍。

3.2 AQS和Lock的关系

通过以上的铺垫,这里可以得出以下结论。

  • AQS面向锁的开发者,它关注的两个主要问题是同步状态管理以及维护线程的同步等待队列;
  • Lock面向锁的使用者,它聚焦的问题是使用者如何更好地使用锁处理并发问题,而使用者不需要知道锁的实现细节就可以实现互斥同步。

4. 介绍AQS

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

4.1 核心思想

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

其中,AQS维护了一个state成员变量用于表示同步状态。具体的含义根据不同的实现可以自行赋予其不同的含义。

1
2
3
4
5
/**  
* The synchronization state.
* volatile 用于保证其可见性
*/
private volatile int state;

其状态信息通过以下几个方法进行操作。

1
2
3
4
5
6
7
8
9
10
11
12
protected final int getState() {  
return state;
}

protected final void setState(int newState) {
state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

4.2 AQS中的模版方法设计模式

AQS是基于[[设计模式——模版方法|模版方法设计模式]]设计的(本文中不展开讨论)。Doug Lea对与同步器的使用过程进行抽象和建模,将大部分代码已经实现,留下了一部分方法给开发者用于满足自己不同场景下的需求。开发者可以通过继承指定的方法,在同步组件的实现过程中调用模版方法,模版方法则会调用用户重写的方法。一般用于重写的方法有如下几个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int arg);

// 独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg);

// 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
protected int tryAcquireShared(int arg);

// 共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int arg);

// 该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively();

当开发者自定义的同步组件未重写指定方法,而又调用此方法时,默认会抛出UnsupportedOperationException异常。

这里没有像常见的模版方法中掩饰的一样,使用接口或者抽象类定义需要自定义的方法,而是使用抛出异常的方式。是因为有些方法是自定义同步组件的时候无需实现的,比如现在要定义一个独占模式的同步组件,那么就没有必要重写共享模式的方法。如果将其定义为了抽象方法的话,那么子类就必须实现其所有抽象方法。

4.3 同步队列和等待队列

继续观察AQS的成员变量, 维护了head和tail。

1
2
3
4
5
6

// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

再观察内部类Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;

// ======== 下面的几个int常量是给waitStatus用的 ===========
/** waitStatus value to indicate thread has cancelled */
// 代码此线程取消了争抢这个锁
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 本文不分析condition,所以略过吧,下一篇文章会介绍这个
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 同样的不分析,略过吧
static final int PROPAGATE = -3;
// =====================================================


// 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
// ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是线程本尊
volatile Thread thread;
}

不难看出,AQS中维护了一个双向链表用于实现同步队列(也叫阻塞队列)。此外Node中通过nextWaiter还维护了一个单向队列用于实现等待队列(等待队列只有当使用Condition时,才会存在此单向链表,并可能有多个等待队列)。如下图:

Pasted image 20240521202122

本文对AQS的介绍就到这里,虽然看起来没什么东西。但是对于后续深入学习AQS以及JUC组件非常重要。在本文中,主要目的就是理解AQS是什么?在同步组件开发中是什么角色?它实现同步的核心思想是什么?
至于AQS的具体细节,将在后续的源码中进行学习。