月泉的博客

AQS原理剖析并实现一把自己的锁

月泉 并发编程JUC

AQS是什么

AQS是一个类的缩写它的全名为AbstractQueuedSynchronizer在Java中许多的锁和同步器就是用该类做的实现,其本质上就是一个双向链表实现的队列队列存储的是线程对象的一个封装,其提供了一个同步器的基本逻辑并且将如何获得锁、释放锁的逻辑等可变更逻辑交给了子类去实现使得这个同步器有很大的扩展空间。

AQS的设计思想

首先我们来看下其类结构

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    public class ConditionObject implements Condition, java.io.Serializable {
    	......
    }
    
    static final class Node {
    	......
    }
}

从大体的结构上来看该类继承了一个AbstractOwnableSynchronizer的类

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
	private transient Thread exclusiveOwnerThread;
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

其父类很简单,其中就一个实例变量exclusiveOwnerThread其主要是用来标识在独占模式下同步器的线程当前者,简明的说就是用来标识现在正在运行的线程的。

接着看一个私有的内部类Node

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
	private transient volatile Node head; 
    private transient volatile Node tail;
    
}

介绍下其中主要的变量

  • SHARED

其用来标识当前Node节点是获取共享资源被挂起加入到队列里面的

  • EXCLUSIVE

其用来表示当前节点是获取独占资源被挂起加入到队列里面的

  • CANCELLED

表示当前线程已经被取消

  • SIGNAL

表示当前线程需要被唤醒

  • CONDITION

在条件队列里面等待

  • PROPAGATE

释放共享资源时需要通知其他节点

  • waitStatus

用来标识当前线程的等待状态

  • prev

当前节点的前驱

  • next

当前节点的后继

  • thread

当前节点锁包装的线程对象

接着就可以介绍下主角AbstractQueuedSynchronizer的设计了,首先它有几个很重要的全局实例变量

private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

headtail无须解释,一个双向队列总是有头有尾的,那么state表示什么呢?在不同的实现上这个变量有不同的意义,它交由其子类来决定它的意义,例如在ReentrantLockstate为0则表示该独占锁未被任何线程持有,为1时代表被持有,当持有线程重复获得锁时,该状态记录了线程的重入次数,在Semaphore的实现中这个变量为可使用的数量,在CountDownLatch的实现中该变量又用来表示剩余可用数量,在ReadWriteLock的实现中该变量高的16位标识读锁的获取数量,低的16位标识写锁的重入次数。

接着来看其主要的方法,首先作为一个队列肯定是有入队

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

从它的实现中很容易的就可以看出来它采用了链表的形式来实现该队列,首先判断tail尾部是否是空的,如果是空的则表示当前队列没有元素,利用CAS将t设为队列的头,然后第一次头尾都指向自己,如果尾部不为空,则把当前要加入的变量的前驱设为当前队列的尾部节点,然后在将当前要添加的节点利用CAS替换成队列的尾,成功后,在将旧的队列的后继指向要添加到队列中的这个新节点。

以下是该类最核心的几个方法了

  • aquire
  • aquireShared
  • aquireInterruptibly
  • aquireSharedInterruptibly
  • release
  • releaseShared

接下来本来想说独占模式或者说独占资源,但我觉得都很拗口,懂就行了,我统称为:独占锁、共享锁。

aquire

通过aquire方法来获取一把独占锁,来看下其实现

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire是交由子类去实现的逻辑(尝试获取锁),如果未获取到则会调用acquireQueued将当前线程对象添加到队列中去,可以在源码中看出acquire是添加的独占标识,具体实现也很简单简而言之就是将当前线程对象封装成Node节点并设置EXCLUSIVE来标识其是获取独占资源加入队列的。

aquireShared

通过aquire方法来获取一个共享锁,来看下其实现

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

其使用的tryAcquireShared也是交由其子类去实现的,大于0时则获取成功,小于0时调用doAcquireShared添加至队列并不断的尝试采用自旋的方式来获取锁

 private void doAcquireShared(int arg) {
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
             final Node p = node.predecessor();
             if (p == head) {
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     if (interrupted)
                         selfInterrupt();
                     failed = false;
                     return;
                 }
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }

aquireInterruptibly/aquireSharedInterruptibly

aquire在获取的时候是不会理会中断响应的,而这2个方法都在aquire的方法名上增加了Interruptibly来标识它会响应中断

public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

release

顾名思义:释放独占锁

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

其首先调用tryRelease来释放当前占用,其方法也是交由子类去实现的,一旦释放锁资源成功后,将会取到队列的头节点,首先检查头节点是否为空,然后判断其等待标识,如果不为空则标识当前队列不是一个空队列,等待标识不等于0标识它在等待调度,随后调用unparkSuccessor唤醒该头节点所包含的线程对象的线程。

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

可以看见其是使用unpark来唤醒线程

releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

首先其也是调用了一个要由子类来实现的tryReleaseShared,成功后会调用doReleaseShared来做队列的唤醒工作

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

首先是一个自旋,然后拿到当前队列的头,然后判断是否是个空队列,拿到头的状态判断是否需要被唤醒,如果需要就更改其唤醒标识并unpark,否则更改尝试更改其标标识,更改失败后继续自旋。

接着摩拳擦掌一翻理解了它的思想,它就是一个FIFO的双向队列

ConditionObject

其还有一个内部类为ConditionObject,每创建一个ConditionObject的实例其本身就代表着一个队列,它也是一个阻塞队列,调用其实例方法await会阻塞线程(并添加到这个条件变量的队列中),调用其实例方法signal来唤醒一个线程添加到AQS的队列中去待唤醒,其signalAll唤醒该实例队列中所有的节点添加到AQS队列中去。

使用AQS实现一把自己的锁

实现一把不可重入的独占锁

public class NoReentrantLock implements Lock {
    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        @Override
        protected boolean tryAcquire(int arg) {
            assert arg == 1;
            if(compareAndSetState(0, 1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;
            if(getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        ConditionObject newCodition(){
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCodition();
    }
}

我这里将state作为一个当前锁是否被占用了的标识,然后来运用一下该锁的使用。

public class NotReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        NotReentrantLock lock = new NotReentrantLock();

        Thread threadA = new Thread(new Executer("threadA", lock), "threadA");
        Thread threadB = new Thread(new Executer("threadB", lock), "threadB");
        Thread threadC = new Thread(new Executer("threadC", lock), "threadC");
        Thread threadD = new Thread(new Executer("threadD", lock), "threadD");
        Thread threadE = new Thread(new Executer("threadE", lock), "threadE");
        Thread threadF = new Thread(new Executer("threadF", lock), "threadF");

        threadA.start();
        threadB.start();
        threadC.start();
        threadD.start();
        threadE.start();
        threadF.start();

    }
}


class Executer implements Runnable{
    private String name;
    private Lock lock;

    public Executer(String name, Lock lock) {
        this.name = name;
        this.lock = lock;
    }

    @Override
    public void run() {
        lock.lock();
        System.out.println(name + " start execute");
        System.out.println(name + " execute compeleted.");
        System.out.println(name + " end execute");
        lock.unlock();
    }
}
月泉
伪文艺中二青年,热爱技术,热爱生活。