AQS 原理拆解
Abstract:初步分析
AQS
与ReentrantLock
的构成与关联
AQS的核心内容分析 [1]
-
AQS
,全称AbstractQueueSynchronizer
,j.u.c
中许多类依赖于AQS
实现,是j.u.c
中的核心框架,主要作用是维护获取锁的队列 -
AQS
中的队列是FIFO
的双向队列 -
AQS
中的节点和锁状态均为volatile
修饰,以此保证锁状态和队列的可见性volatile int waitStatus; //表示线程的等待状态,包含5种枚举值,含义如下 /** * 0,初始化值 * 1,CANCELED,表示获取锁的请求取消 * -1,SIGNAL,表示线程就绪,等待资源释放 * -2,CONDITION,表示处于等待队列中,等待被唤醒 * -3,PROPAGATE,共享模式时使用 */ private transient volatile Node head; //头结点为虚节点,用于维护表头 private transient volatile Node tail; volatile Node prev; //前驱节点 volatile Node next; //后继节点 volatile Thread thread; //代表的线程 private volatile int state; //同步状态,即锁状态,大于1时表示同一线程递归上锁,等于0时锁被释放
实现自定义同步器需要实现的以下方法:
//判断线程是否取得锁 protected boolean isHeldExclusively() //独占模式 protected boolean tryAcquire(int arg) //用于AQS中的acquire方法,CAS获取锁 protected boolean tryRelease(int arg) //用于AQS中的release方法,释放锁 //共享模式 protected int tryAcquireShared(int arg) //锁可以被多个线程获取,但是有上限 protected boolean tryReleaseShared(int arg) //纯独占模式的实现:`ReentrantLock` //双模式实现:`ReentrantWriteReadLock` //其中acquire和release继承于AQS,不需要重写,用于管理位于双向队列中的各节点
加入等待队列的流程
//1.请求锁 public final void acquire(int arg) { if (!tryAcquire(arg) && //先尝试获取锁,如果失败,请求队列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //独占模式才有此操作 selfInterrupt(); //一定情况下自行中断,下面会说 } //2.非极端情况下加入Waiter,即添加到队列队尾 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { //对队尾的设置也是CAS操作 pred.next = node; return node; } } enq(node); return node; } //2.1.如果入队时没有tail,那么初始化一个Node出来 private Node enq(final Node node) { for (;;) { //通过自旋来设置 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //由于不是原子操作,如果此时头部有了,那么还是addWaiter里面的老方法 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
队列处理流程
//1.上面acquire中出现的acqureQueued final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //通过自旋 final Node p = node.predecessor(); //取得前驱节点 if (p == head && tryAcquire(arg)) { //由于head为虚节点,如果在队首=>尝试取锁 setHead(node); //node变为虚节点,相当于删除 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //判断失败过后是否应该继续留队 parkAndCheckInterrupt()) //如果是,再确认是否中断 interrupted = true; } } finally { if (failed) cancelAcquire(node); //如果最终失败,节点设为CANCELED } } //2.留队判断:通过waitStatus判断后续处理,官方的注释写得已经比较清楚了 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //已就绪,继续留队 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //ws=1,表示取消,向前找到下一个未取消的节点,将中间这些取消的节点出队 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //此时不是0就是PROPAGATE,设为SIGNAL,等下一轮锁请求 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //3.如果留队,还要判断是否中断 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //通过UNSAFE相关操作,在后面UNSAFE中再详细解释 return Thread.interrupted(); } //4.最后,如果没能上机,则waitSatus设为CANCELED private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors : 跳过所有前面其他被取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. :如果是尾结点,直接移除就行 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
与ReentrantLock的关联
-
ReentrantLock
是Lock
的实现类而不是AQS
的实现类 -
其内部自定义同步器
Sync
是AQS
的实现类,内部实现为独占模式abstract static class Sync extends AbstractQueuedSynchronizer{...}
-
ReentrantLock
中存在公平锁和非公平锁,实现分别为NonfairSync
和FairSync
-
以非公平锁为例,
ReentrantLock
对AQS
方法的实现如下://tryAcquire,和原方法名不同是因为tryAcquire间接调用了此方法,就不贴上来了 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //获取当前锁状态 if (c == 0) { if (compareAndSetState(0, acquires)) { //如果锁空闲,CAS尝试获取锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //重入该锁,可重入性 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //tryRelease protected final boolean tryRelease(int releases) { int c = getState() - releases; //释放锁 if (Thread.currentThread() != getExclusiveOwnerThread()) //不是锁的持有者无法释放锁 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); //如果释放后state=0,将锁设为空闲,取消线程独占 } setState(c); return free; } //isHeldExclusively,确认此线程是否持有锁 protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); }
-
NonfairSync
和FairSync
的区别//公平锁 int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && //此处多一个是否为sync队首的判断,该方法来自AQS compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }